1.9.30 sync.
[gae.git] / python / google / appengine / api / logservice / logservice.py
blob5107b0d26c559436825ec58356d10966b19e5566
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
19 """LogService API.
21 This module allows apps to flush logs, provide status messages, and
22 programmatically access their request and application logs.
23 """
30 from __future__ import with_statement
31 import base64
32 import collections
33 import cStringIO
34 import logging
35 import os
36 import re
37 import sys
38 import threading
39 import time
40 import warnings
42 from google.net.proto import ProtocolBuffer
43 from google.appengine.api import api_base_pb
44 from google.appengine.api import apiproxy_stub_map
45 from google.appengine.api.logservice import log_service_pb
46 from google.appengine.api.logservice import logsutil
47 from google.appengine.datastore import datastore_rpc
48 from google.appengine.runtime import apiproxy_errors
51 AUTOFLUSH_ENABLED = True
54 AUTOFLUSH_EVERY_SECONDS = 60
57 AUTOFLUSH_EVERY_BYTES = 4096
60 AUTOFLUSH_EVERY_LINES = 50
63 MAX_ITEMS_PER_FETCH = 1000
66 LOG_LEVEL_DEBUG = logsutil.LOG_LEVEL_DEBUG
67 LOG_LEVEL_INFO = logsutil.LOG_LEVEL_INFO
68 LOG_LEVEL_WARNING = logsutil.LOG_LEVEL_WARNING
69 LOG_LEVEL_ERROR = logsutil.LOG_LEVEL_ERROR
70 LOG_LEVEL_CRITICAL = logsutil.LOG_LEVEL_CRITICAL
73 MODULE_ID_RE_STRING = r'(?!-)[a-z\d\-]{1,63}'
76 MODULE_VERSION_RE_STRING = r'(?!-)[a-z\d\-]{1,100}'
77 _MAJOR_VERSION_ID_PATTERN = r'^(?:(?:(%s):)?)(%s)$' % (MODULE_ID_RE_STRING,
78 MODULE_VERSION_RE_STRING)
80 _MAJOR_VERSION_ID_RE = re.compile(_MAJOR_VERSION_ID_PATTERN)
82 _REQUEST_ID_PATTERN = r'^[\da-fA-F]+$'
83 _REQUEST_ID_RE = re.compile(_REQUEST_ID_PATTERN)
86 class Error(Exception):
87 """Base error class for this module."""
90 class InvalidArgumentError(Error):
91 """Function argument has invalid value."""
94 class TimeoutError(Error):
95 """Requested timeout for fetch() call has expired while iterating results."""
97 def __init__(self, msg, offset, last_end_time):
98 Error.__init__(self, msg)
99 self.__offset = offset
100 self.__last_end_time = last_end_time
102 @property
103 def offset(self):
104 """Binary offset indicating the current position in the result stream.
106 May be submitted to future Log read requests to continue iterating logs
107 starting exactly where this iterator left off.
109 Returns:
110 A byte string representing an offset into the log stream, or None.
112 return self.__offset
114 @property
115 def last_end_time(self):
116 """End time of the last request examined prior to the timeout, or None.
118 Returns:
119 A float representing the completion time in seconds since the Unix
120 epoch of the last request examined.
122 return self.__last_end_time
125 class LogsBufferNew(object):
126 """Threadsafe buffer for storing and periodically flushing app logs."""
128 def __init__(self, stream=None, stderr=False):
129 """Initializes the buffer, which wraps an internal buffer or sys.stderr.
131 The state of the LogsBuffer is protected by a separate lock. The lock is
132 acquired before any variables are mutated or accessed, and released
133 afterward. A recursive lock is used so that a single thread can acquire the
134 lock multiple times, and release it only when an identical number of
135 'unlock()' calls have been performed.
137 Args:
138 stream: Unused. Left there for backward compatibility.
139 stderr: If specified, use sys.stderr as the underlying stream.
141 Raises:
142 ValueError: if stream is provided.
144 if stream is not None:
145 raise ValueError('underlying streams are no longer supported')
150 self._buffer = collections.deque()
151 self._stderr = stderr
152 self._lock = threading.RLock()
153 self._reset()
155 _MAX_FLUSH_SIZE = 1000 * 1000
156 _MAX_LINE_SIZE = _MAX_FLUSH_SIZE
158 @staticmethod
159 def _truncate(line, max_length=_MAX_LINE_SIZE):
160 """Truncates a potentially long log down to a specified maximum length."""
161 if len(line) > max_length:
162 original_length = len(line)
163 suffix = '...(length %d)' % original_length
164 line = line[:max_length - len(suffix)] + suffix
165 return line
167 def stream(self):
168 """Returns the underlying file-like object used to buffer logs."""
169 if self._stderr:
172 return sys.stderr
175 return cStringIO.StringIO(self.contents())
177 def lines(self):
178 """Returns the number of log lines currently buffered."""
179 with self._lock:
180 return len(self._buffer)
182 def bytes(self):
183 """Returns the size of the log buffer, in bytes."""
184 with self._lock:
185 return self._bytes
187 def age(self):
188 """Returns the number of seconds since the log buffer was flushed."""
189 with self._lock:
190 return time.time() - self._flush_time
192 def flush_time(self):
193 """Returns last time that the log buffer was flushed."""
194 with self._lock:
195 return self._flush_time
197 def contents(self):
198 """Returns the contents of the logs buffer."""
199 with self._lock:
200 return self._contents()
202 def _contents(self):
203 """Internal version of contents() with no locking."""
204 return ''.join(self._buffer)
206 def reset(self):
207 """Resets the buffer state, without clearing the underlying stream."""
208 with self._lock:
209 return self._reset()
211 def _reset(self):
212 """Internal version of reset() with no locking."""
213 self._bytes = sum(len(line) for line in self._buffer)
214 self._flush_time = time.time()
215 self._request = logsutil.RequestID()
217 def clear(self):
218 """Clears the contents of the logs buffer, and resets autoflush state."""
219 with self._lock:
220 return self._clear()
222 def _clear(self):
223 """Internal version of clear() with no locking."""
224 self._buffer.clear()
225 self._reset()
227 def close(self):
228 """Closes the underlying stream, flushing the current contents."""
229 with self._lock:
230 return self._close()
232 def _close(self):
233 """Internal version of close() with no locking."""
234 self._flush()
236 def parse_logs(self):
237 """Parse the contents of the buffer and return an array of log lines."""
238 without_newlines = (line[:-1] if line[-1] == '\n' else line
239 for line in self._buffer)
240 return [logsutil.ParseLogEntry(line) for line in without_newlines if line]
242 def write(self, line):
243 """Writes a line to the logs buffer."""
244 with self._lock:
245 return self._write(line)
247 def writelines(self, seq):
248 """Writes each line in the given sequence to the logs buffer."""
249 for line in seq:
250 self.write(line)
252 def _put_line(self, line):
253 """Write the line in the internal buffer for the next flush."""
254 self._buffer.append(line)
255 self._bytes += len(line)
257 def _get_line(self):
258 """Get and deque the oldest log line from the internal buffer."""
259 line = self._buffer.popleft()
260 self._bytes -= len(line)
261 return line
263 def _rollback_line(self, line):
264 """Write back the line as the oldest in the internal buffer."""
265 self._buffer.appendleft(line)
266 self._bytes += len(line)
268 def _write(self, line):
269 """Writes a line to the logs buffer."""
270 if self._request != logsutil.RequestID():
273 self._reset()
274 if self._stderr:
275 sys.stderr.write(line)
276 else:
277 self._put_line(line)
278 self._autoflush()
280 def flush(self):
281 """Flushes the contents of the logs buffer.
283 This method holds the buffer lock until the API call has finished to ensure
284 that flush calls are performed in the correct order, so that log messages
285 written during the flush call aren't dropped or accidentally wiped, and so
286 that the other buffer state variables (flush time, lines, bytes) are updated
287 synchronously with the flush.
289 with self._lock:
290 self._flush()
292 def _flush(self):
293 """Internal version of flush() with no locking."""
294 if self._stderr:
295 sys.stderr.flush()
296 return
298 lines_to_be_flushed = []
299 try:
300 while True:
301 group = log_service_pb.UserAppLogGroup()
302 bytes_left = LogsBufferNew._MAX_FLUSH_SIZE
303 while self._buffer:
304 bare_line = self._get_line()
306 timestamp_usec, level, message = logsutil.ParseLogEntry(bare_line)
308 if message[-1] == '\n':
309 message = message[:-1]
311 if not message:
312 continue
316 message = LogsBufferNew._truncate(
317 message, LogsBufferNew._MAX_LINE_SIZE)
320 if len(message) > bytes_left:
321 self._rollback_line(bare_line)
322 break
324 lines_to_be_flushed.append(bare_line)
326 line = group.add_log_line()
327 line.set_timestamp_usec(timestamp_usec)
328 line.set_level(level)
329 line.set_message(message)
331 bytes_left -= 1 + group.lengthString(line.ByteSize())
333 request = log_service_pb.FlushRequest()
334 request.set_logs(group.Encode())
335 response = api_base_pb.VoidProto()
336 apiproxy_stub_map.MakeSyncCall('logservice', 'Flush', request, response)
337 if not self._buffer:
338 break
339 except apiproxy_errors.CancelledError:
342 lines_to_be_flushed.reverse()
343 self._buffer.extendleft(lines_to_be_flushed)
344 except Exception, e:
345 lines_to_be_flushed.reverse()
346 self._buffer.extendleft(lines_to_be_flushed)
347 if not self._stderr:
348 line = '-' * 80
349 msg = 'ERROR: Could not flush to log_service (%s)\n%s\n%s\n%s\n'
350 sys.stderr.write(msg % (str(e), line, '\n'.join(self._buffer), line))
351 self._clear()
352 raise
353 else:
354 self._clear()
356 def autoflush(self):
357 """Flushes the buffer if certain conditions have been met."""
358 with self._lock:
359 return self._autoflush()
361 def _autoflush(self):
362 """Internal version of autoflush() with no locking."""
363 if not self.autoflush_enabled():
364 return
366 if ((AUTOFLUSH_EVERY_SECONDS and self.age() >= AUTOFLUSH_EVERY_SECONDS) or
367 (AUTOFLUSH_EVERY_LINES and self.lines() >= AUTOFLUSH_EVERY_LINES) or
368 (AUTOFLUSH_EVERY_BYTES and self.bytes() >= AUTOFLUSH_EVERY_BYTES)):
369 self._flush()
371 def autoflush_enabled(self):
372 """Indicates if the buffer will periodically flush logs during a request."""
373 return AUTOFLUSH_ENABLED
376 def logs_buffer():
377 """Returns the LogsBuffer used by the current request."""
382 return _global_buffer
385 def write(message):
386 """Adds 'message' to the logs buffer, and checks for autoflush.
388 Args:
389 message: A message (string) to be written to application logs.
391 logs_buffer().write(message)
394 def clear():
395 """Clear the logs buffer and reset the autoflush state."""
396 logs_buffer().clear()
399 def autoflush():
400 """If AUTOFLUSH conditions have been met, performs a Flush API call."""
401 logs_buffer().autoflush()
404 def flush():
405 """Flushes log lines that are currently buffered."""
406 logs_buffer().flush()
409 def flush_time():
410 """Returns last time that the logs buffer was flushed."""
411 return logs_buffer().flush_time()
414 def log_buffer_age():
415 """Returns the number of seconds since the logs buffer was flushed."""
416 return logs_buffer().age()
419 def log_buffer_contents():
420 """Returns the contents of the logs buffer."""
421 return logs_buffer().contents()
424 def log_buffer_bytes():
425 """Returns the size of the logs buffer, in bytes."""
426 return logs_buffer().bytes()
429 def log_buffer_lines():
430 """Returns the number of log lines currently buffered."""
431 return logs_buffer().lines()
434 class _LogQueryResult(object):
435 """A container that holds a log request and provides an iterator to read logs.
437 A _LogQueryResult object is the standard returned item for a call to fetch().
438 It is iterable - each value returned is a log that the user has queried for,
439 and internally, it holds a cursor that it uses to fetch more results once the
440 current, locally held set, are exhausted.
442 Properties:
443 _request: A LogReadRequest that contains the parameters the user has set for
444 the initial fetch call, which will be updated with a more current cursor
445 if more logs are requested.
446 _logs: A list of RequestLogs corresponding to logs the user has asked for.
447 _read_called: A boolean that indicates if a Read call has even been made
448 with the request stored in this object.
451 def __init__(self, request, timeout=None):
452 """Constructor.
454 Args:
455 request: A LogReadRequest object that will be used for Read calls.
457 self._request = request
458 self._logs = []
459 self._read_called = False
460 self._last_end_time = None
461 self._end_time = None
462 if timeout is not None:
463 self._end_time = time.time() + timeout
465 def __iter__(self):
466 """Provides an iterator that yields log records one at a time."""
467 while True:
468 for log_item in self._logs:
469 yield RequestLog(log_item)
470 if not self._read_called or self._request.has_offset():
471 if self._end_time and time.time() >= self._end_time:
472 offset = None
473 if self._request.has_offset():
474 offset = self._request.offset().Encode()
475 raise TimeoutError('A timeout occurred while iterating results',
476 offset=offset, last_end_time=self._last_end_time)
477 self._read_called = True
478 self._advance()
479 else:
480 break
482 def _advance(self):
483 """Acquires additional logs via cursor.
485 This method is used by the iterator when it has exhausted its current set of
486 logs to acquire more logs and update its internal structures accordingly.
488 response = log_service_pb.LogReadResponse()
490 try:
491 apiproxy_stub_map.MakeSyncCall('logservice', 'Read', self._request,
492 response)
493 except apiproxy_errors.ApplicationError, e:
494 if e.application_error == log_service_pb.LogServiceError.INVALID_REQUEST:
495 raise InvalidArgumentError(e.error_detail)
496 raise Error(e.error_detail)
498 self._logs = response.log_list()
499 self._request.clear_offset()
500 if response.has_offset():
501 self._request.mutable_offset().CopyFrom(response.offset())
502 self._last_end_time = None
503 if response.has_last_end_time():
504 self._last_end_time = response.last_end_time() / 1e6
507 class RequestLog(object):
508 """Complete log information about a single request to an application."""
510 def __init__(self, request_log=None):
511 if type(request_log) is str:
512 self.__pb = log_service_pb.RequestLog(base64.b64decode(request_log))
513 elif request_log.__class__ == log_service_pb.RequestLog:
514 self.__pb = request_log
515 else:
516 self.__pb = log_service_pb.RequestLog()
517 self.__lines = []
519 def __repr__(self):
520 return 'RequestLog(\'%s\')' % base64.b64encode(self.__pb.Encode())
522 def __str__(self):
523 if self.module_id == 'default':
524 return ('<RequestLog(app_id=%s, version_id=%s, request_id=%s)>' %
525 (self.app_id, self.version_id, base64.b64encode(self.request_id)))
526 else:
527 return ('<RequestLog(app_id=%s, module_id=%s, version_id=%s, '
528 'request_id=%s)>' %
529 (self.app_id, self.module_id, self.version_id,
530 base64.b64encode(self.request_id)))
532 @property
533 def _pb(self):
534 return self.__pb
536 @property
537 def app_id(self):
538 """Application id that handled this request, as a string."""
539 return self.__pb.app_id()
541 @property
542 def module_id(self):
543 """Module id that handled this request, as a string."""
544 return self.__pb.module_id()
546 @property
547 def version_id(self):
548 """Version of the application that handled this request, as a string."""
549 return self.__pb.version_id()
551 @property
552 def request_id(self):
553 """Globally unique identifier for a request, based on request start time.
555 Request ids for requests which started later will compare greater as
556 binary strings than those for requests which started earlier.
558 Returns:
559 A byte string containing a unique identifier for this request.
561 return self.__pb.request_id()
563 @property
564 def offset(self):
565 """Binary offset indicating current position in the result stream.
567 May be submitted to future Log read requests to continue immediately after
568 this request.
570 Returns:
571 A byte string representing an offset into the active result stream.
573 if self.__pb.has_offset():
574 return self.__pb.offset().Encode()
575 return None
577 @property
578 def ip(self):
579 """The origin IP address of the request, as a string."""
580 return self.__pb.ip()
582 @property
583 def nickname(self):
584 """Nickname of the user that made the request if known and logged in.
586 Returns:
587 A string representation of the logged in user's nickname, or None.
589 if self.__pb.has_nickname():
590 return self.__pb.nickname()
591 return None
593 @property
594 def start_time(self):
595 """Time at which request was known to have begun processing.
597 Returns:
598 A float representing the time this request began processing in seconds
599 since the Unix epoch.
601 return self.__pb.start_time() / 1e6
603 @property
604 def end_time(self):
605 """Time at which request was known to have completed.
607 Returns:
608 A float representing the request completion time in seconds since the
609 Unix epoch.
611 return self.__pb.end_time() / 1e6
613 @property
614 def latency(self):
615 """Time required to process request in seconds, as a float."""
616 return self.__pb.latency() / 1e6
618 @property
619 def mcycles(self):
620 """Number of machine cycles used to process request, as an integer."""
621 return self.__pb.mcycles()
623 @property
624 def method(self):
625 """Request method (GET, PUT, POST, etc), as a string."""
626 return self.__pb.method()
628 @property
629 def resource(self):
630 """Resource path on server requested by client.
632 For example, http://nowhere.com/app would have a resource string of '/app'.
634 Returns:
635 A string containing the path component of the request URL.
637 return self.__pb.resource()
639 @property
640 def http_version(self):
641 """HTTP version of request, as a string."""
642 return self.__pb.http_version()
644 @property
645 def status(self):
646 """Response status of request, as an int."""
647 return self.__pb.status()
649 @property
650 def response_size(self):
651 """Size in bytes sent back to client by request, as a long."""
652 return self.__pb.response_size()
654 @property
655 def referrer(self):
656 """Referrer URL of request as a string, or None."""
657 if self.__pb.has_referrer():
658 return self.__pb.referrer()
659 return None
661 @property
662 def user_agent(self):
663 """User agent used to make the request as a string, or None."""
664 if self.__pb.has_user_agent():
665 return self.__pb.user_agent()
666 return None
668 @property
669 def url_map_entry(self):
670 """File or class within URL mapping used for request.
672 Useful for tracking down the source code which was responsible for managing
673 request, especially for multiply mapped handlers.
675 Returns:
676 A string containing a file or class name.
678 return self.__pb.url_map_entry()
680 @property
681 def combined(self):
682 """Apache combined log entry for request.
684 The information in this field can be constructed from the rest of
685 this message, however, this field is included for convenience.
687 Returns:
688 A string containing an Apache-style log line in the form documented at
689 http://httpd.apache.org/docs/1.3/logs.html.
691 return self.__pb.combined()
693 @property
694 def api_mcycles(self):
695 """Number of machine cycles spent in API calls while processing request.
697 Deprecated. This value is no longer meaningful.
699 Returns:
700 Number of API machine cycles used as a long, or None if not available.
702 warnings.warn('api_mcycles does not return a meaningful value.',
703 DeprecationWarning, stacklevel=2)
704 if self.__pb.has_api_mcycles():
705 return self.__pb.api_mcycles()
706 return None
708 @property
709 def host(self):
710 """The Internet host and port number of the resource being requested.
712 Returns:
713 A string representing the host and port receiving the request, or None
714 if not available.
716 if self.__pb.has_host():
717 return self.__pb.host()
718 return None
720 @property
721 def cost(self):
722 """The estimated cost of this request, in fractional dollars.
724 Returns:
725 A float representing an estimated fractional dollar cost of this
726 request, or None if not available.
728 if self.__pb.has_cost():
729 return self.__pb.cost()
730 return None
732 @property
733 def task_queue_name(self):
734 """The request's queue name, if generated via the Task Queue API.
736 Returns:
737 A string containing the request's queue name if relevant, or None.
739 if self.__pb.has_task_queue_name():
740 return self.__pb.task_queue_name()
741 return None
743 @property
744 def task_name(self):
745 """The request's task name, if generated via the Task Queue API.
747 Returns:
748 A string containing the request's task name if relevant, or None.
750 if self.__pb.has_task_name():
751 return self.__pb.task_name()
753 @property
754 def was_loading_request(self):
755 """Returns whether this request was a loading request for an instance.
757 Returns:
758 A bool indicating whether this request was a loading request.
760 return bool(self.__pb.was_loading_request())
762 @property
763 def pending_time(self):
764 """Time this request spent in the pending request queue.
766 Returns:
767 A float representing the time in seconds that this request was pending.
769 return self.__pb.pending_time() / 1e6
771 @property
772 def replica_index(self):
773 """The module replica that handled the request as an integer, or None."""
774 if self.__pb.has_replica_index():
775 return self.__pb.replica_index()
776 return None
778 @property
779 def finished(self):
780 """Whether or not this log represents a finished request, as a bool."""
781 return bool(self.__pb.finished())
783 @property
784 def instance_key(self):
785 """Mostly-unique identifier for the instance that handled the request.
787 Returns:
788 A string encoding of an instance key if available, or None.
790 if self.__pb.has_clone_key():
791 return self.__pb.clone_key()
792 return None
794 @property
795 def app_logs(self):
796 """Logs emitted by the application while serving this request.
798 Returns:
799 A list of AppLog objects representing the log lines for this request, or
800 an empty list if none were emitted or the query did not request them.
802 if not self.__lines and self.__pb.line_size():
803 self.__lines = [AppLog(time=line.time() / 1e6, level=line.level(),
804 message=line.log_message())
805 for line in self.__pb.line_list()]
806 return self.__lines
808 @property
809 def app_engine_release(self):
810 """App Engine Infrastructure release that served this request.
812 Returns:
813 A string containing App Engine version that served this request, or None
814 if not available.
816 if self.__pb.has_app_engine_release():
817 return self.__pb.app_engine_release()
818 return None
821 class AppLog(object):
822 """Application log line emitted while processing a request."""
826 def __init__(self, time=None, level=None, message=None):
827 self._time = time
828 self._level = level
829 self._message = message
831 def __eq__(self, other):
832 return (self.time == other.time and self.level and other.level and
833 self.message == other.message)
835 def __repr__(self):
836 return ('AppLog(time=%f, level=%d, message=\'%s\')' %
837 (self.time, self.level, self.message))
839 @property
840 def time(self):
841 """Time log entry was made, in seconds since the Unix epoch, as a float."""
842 return self._time
844 @property
845 def level(self):
846 """Level or severity of log, as an int."""
847 return self._level
849 @property
850 def message(self):
851 """Application-provided log message, as a string."""
852 return self._message
856 _FETCH_KWARGS = frozenset(['prototype_request', 'timeout', 'batch_size',
857 'server_versions'])
860 @datastore_rpc._positional(0)
861 def fetch(start_time=None,
862 end_time=None,
863 offset=None,
864 minimum_log_level=None,
865 include_incomplete=False,
866 include_app_logs=False,
867 module_versions=None,
868 version_ids=None,
869 request_ids=None,
870 **kwargs):
871 """Returns an iterator yielding an application's request and application logs.
873 Logs will be returned by the iterator in reverse chronological order by
874 request end time, or by last flush time for requests still in progress (if
875 requested). The items yielded are RequestLog objects, the contents of which
876 are accessible via method calls.
878 All parameters are optional.
880 Args:
881 start_time: The earliest request completion or last-update time that
882 results should be fetched for, in seconds since the Unix epoch.
883 end_time: The latest request completion or last-update time that
884 results should be fetched for, in seconds since the Unix epoch.
885 offset: A byte string representing an offset into the log stream, extracted
886 from a previously emitted RequestLog. This iterator will begin
887 immediately after the record from which the offset came.
888 minimum_log_level: An application log level which serves as a filter on the
889 requests returned--requests with no application log at or above the
890 specified level will be omitted. Works even if include_app_logs is not
891 True. In ascending order, the available log levels are:
892 logservice.LOG_LEVEL_DEBUG, logservice.LOG_LEVEL_INFO,
893 logservice.LOG_LEVEL_WARNING, logservice.LOG_LEVEL_ERROR,
894 and logservice.LOG_LEVEL_CRITICAL.
895 include_incomplete: Whether or not to include requests that have started but
896 not yet finished, as a boolean. Defaults to False.
897 include_app_logs: Whether or not to include application level logs in the
898 results, as a boolean. Defaults to False.
899 module_versions: A list of tuples of the form (module, version), that
900 indicate that the logs for the given module/version combination should be
901 fetched. Duplicate tuples will be ignored. This kwarg may not be used
902 in conjunction with the 'version_ids' kwarg.
903 version_ids: A list of version ids whose logs should be queried against.
904 Defaults to the application's current version id only. This kwarg may not
905 be used in conjunction with the 'module_versions' kwarg.
906 request_ids: If not None, indicates that instead of a time-based scan, logs
907 for the specified requests should be returned. Malformed request IDs will
908 cause the entire request to be rejected, while any requests that are
909 unknown will be ignored. This option may not be combined with any
910 filtering options such as start_time, end_time, offset, or
911 minimum_log_level. version_ids is ignored. IDs that do not correspond to
912 a request log will be ignored. Logs will be returned in the order
913 requested.
915 Returns:
916 An iterable object containing the logs that the user has queried for.
918 Raises:
919 InvalidArgumentError: Raised if any of the input parameters are not of the
920 correct type.
923 args_diff = set(kwargs) - _FETCH_KWARGS
924 if args_diff:
925 raise InvalidArgumentError('Invalid arguments: %s' % ', '.join(args_diff))
927 request = log_service_pb.LogReadRequest()
929 request.set_app_id(os.environ['APPLICATION_ID'])
931 if start_time is not None:
932 if not isinstance(start_time, (float, int, long)):
933 raise InvalidArgumentError('start_time must be a float or integer')
934 request.set_start_time(long(start_time * 1000000))
936 if end_time is not None:
937 if not isinstance(end_time, (float, int, long)):
938 raise InvalidArgumentError('end_time must be a float or integer')
939 request.set_end_time(long(end_time * 1000000))
941 if offset is not None:
942 try:
943 request.mutable_offset().ParseFromString(offset)
944 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
945 raise InvalidArgumentError('offset must be a string or read-only buffer')
947 if minimum_log_level is not None:
948 if not isinstance(minimum_log_level, int):
949 raise InvalidArgumentError('minimum_log_level must be an int')
951 if minimum_log_level not in logsutil.LOG_LEVELS:
952 raise InvalidArgumentError(
953 'minimum_log_level must be one of %s' % repr(
954 logsutil.LOG_LEVELS))
955 request.set_minimum_log_level(minimum_log_level)
957 if not isinstance(include_incomplete, bool):
958 raise InvalidArgumentError('include_incomplete must be a boolean')
959 request.set_include_incomplete(include_incomplete)
961 if not isinstance(include_app_logs, bool):
962 raise InvalidArgumentError('include_app_logs must be a boolean')
963 request.set_include_app_logs(include_app_logs)
965 if 'server_versions' in kwargs:
966 logging.warning('The server_versions kwarg to the fetch() method is '
967 'deprecated. Please use the module_versions kwarg '
968 'instead.')
969 module_versions = kwargs.pop('server_versions')
970 if version_ids and module_versions:
971 raise InvalidArgumentError('version_ids and module_versions may not be '
972 'used at the same time.')
974 if version_ids is None and module_versions is None:
975 module_version = request.add_module_version()
976 if os.environ['CURRENT_MODULE_ID'] != 'default':
978 module_version.set_module_id(os.environ['CURRENT_MODULE_ID'])
979 module_version.set_version_id(
980 os.environ['CURRENT_VERSION_ID'].split('.')[0])
982 if module_versions:
983 if not isinstance(module_versions, list):
984 raise InvalidArgumentError('module_versions must be a list')
986 req_module_versions = set()
987 for entry in module_versions:
988 if not isinstance(entry, (list, tuple)):
989 raise InvalidArgumentError('module_versions list entries must all be '
990 'tuples or lists.')
991 if len(entry) != 2:
992 raise InvalidArgumentError('module_versions list entries must all be '
993 'of length 2.')
994 req_module_versions.add((entry[0], entry[1]))
996 for module, version in sorted(req_module_versions):
997 req_module_version = request.add_module_version()
1000 if module != 'default':
1001 req_module_version.set_module_id(module)
1002 req_module_version.set_version_id(version)
1004 if version_ids:
1005 if not isinstance(version_ids, list):
1006 raise InvalidArgumentError('version_ids must be a list')
1007 for version_id in version_ids:
1008 if not _MAJOR_VERSION_ID_RE.match(version_id):
1009 raise InvalidArgumentError(
1010 'version_ids must only contain valid major version identifiers')
1011 request.add_module_version().set_version_id(version_id)
1013 if request_ids is not None:
1014 if not isinstance(request_ids, list):
1015 raise InvalidArgumentError('request_ids must be a list')
1016 if not request_ids:
1017 raise InvalidArgumentError('request_ids must not be empty')
1018 if len(request_ids) != len(set(request_ids)):
1019 raise InvalidArgumentError('request_ids must not contain duplicates')
1020 for request_id in request_ids:
1021 if not _REQUEST_ID_RE.match(request_id):
1022 raise InvalidArgumentError(
1023 '%s is not a valid request log id' % request_id)
1024 request.request_id_list()[:] = request_ids
1026 prototype_request = kwargs.get('prototype_request')
1027 if prototype_request:
1028 if not isinstance(prototype_request, log_service_pb.LogReadRequest):
1029 raise InvalidArgumentError('prototype_request must be a LogReadRequest')
1030 request.MergeFrom(prototype_request)
1032 timeout = kwargs.get('timeout')
1033 if timeout is not None:
1034 if not isinstance(timeout, (float, int, long)):
1035 raise InvalidArgumentError('timeout must be a float or integer')
1037 batch_size = kwargs.get('batch_size')
1038 if batch_size is not None:
1039 if not isinstance(batch_size, (int, long)):
1040 raise InvalidArgumentError('batch_size must be an integer')
1042 if batch_size < 1:
1043 raise InvalidArgumentError('batch_size must be greater than zero')
1045 if batch_size > MAX_ITEMS_PER_FETCH:
1046 raise InvalidArgumentError('batch_size specified is too large')
1047 request.set_count(batch_size)
1049 return _LogQueryResult(request, timeout=timeout)
1059 class LogsBufferOld(object):
1060 """Threadsafe buffer for storing and periodically flushing app logs."""
1062 _MAX_FLUSH_SIZE = 1000 * 1000
1063 _MAX_LINE_SIZE = _MAX_FLUSH_SIZE
1064 assert _MAX_LINE_SIZE <= _MAX_FLUSH_SIZE
1066 def __init__(self, stream=None, stderr=False):
1067 """Initializes the buffer, which wraps the given stream or sys.stderr.
1069 The state of the LogsBuffer is protected by a separate lock. The lock is
1070 acquired before any variables are mutated or accessed, and released
1071 afterward. A recursive lock is used so that a single thread can acquire the
1072 lock multiple times, and release it only when an identical number of
1073 'unlock()' calls have been performed.
1075 Args:
1076 stream: A file-like object to store logs. Defaults to a cStringIO object.
1077 stderr: If specified, use sys.stderr as the underlying stream.
1079 self._stderr = stderr
1080 if self._stderr:
1081 assert stream is None
1082 else:
1083 self._stream = stream or cStringIO.StringIO()
1084 self._lock = threading.RLock()
1085 self._reset()
1087 def stream(self):
1088 """Returns the underlying file-like object used to buffer logs."""
1089 if self._stderr:
1092 return sys.stderr
1093 else:
1094 return self._stream
1096 def lines(self):
1097 """Returns the number of log lines currently buffered."""
1098 with self._lock:
1099 return self._lines
1101 def bytes(self):
1102 """Returns the size of the log buffer, in bytes."""
1103 with self._lock:
1104 return self._bytes
1106 def age(self):
1107 """Returns the number of seconds since the log buffer was flushed."""
1108 with self._lock:
1109 return time.time() - self._flush_time
1111 def flush_time(self):
1112 """Returns last time that the log buffer was flushed."""
1113 with self._lock:
1114 return self._flush_time
1116 def contents(self):
1117 """Returns the contents of the logs buffer."""
1118 with self._lock:
1119 return self._contents()
1121 def _contents(self):
1122 """Internal version of contents() with no locking."""
1123 try:
1124 return self.stream().getvalue()
1125 except AttributeError:
1128 return ''
1130 def reset(self):
1131 """Resets the buffer state, without clearing the underlying stream."""
1132 with self._lock:
1133 return self._reset()
1135 def _reset(self):
1136 """Internal version of reset() with no locking."""
1137 contents = self._contents()
1138 self._bytes = len(contents)
1139 self._lines = len(contents.split('\n')) - 1
1140 self._flush_time = time.time()
1141 self._request = logsutil.RequestID()
1143 def clear(self):
1144 """Clears the contents of the logs buffer, and resets autoflush state."""
1145 with self._lock:
1146 return self._clear()
1148 def _clear(self):
1149 """Internal version of clear() with no locking."""
1150 if self._bytes > 0:
1151 self.stream().truncate(0)
1152 self._reset()
1154 def close(self):
1155 """Closes the underlying stream, flushing the current contents."""
1156 with self._lock:
1157 return self._close()
1159 def _close(self):
1160 """Internal version of close() with no locking."""
1161 self._flush()
1162 self.stream().close()
1164 def parse_logs(self):
1165 """Parse the contents of the buffer and return an array of log lines."""
1166 return logsutil.ParseLogs(self.contents())
1168 def write(self, line):
1169 """Writes a line to the logs buffer."""
1170 with self._lock:
1171 return self._write(line)
1173 def writelines(self, seq):
1174 """Writes each line in the given sequence to the logs buffer."""
1175 for line in seq:
1176 self.write(line)
1178 def _write(self, line):
1179 """Writes a line to the logs buffer."""
1180 if self._request != logsutil.RequestID():
1183 self._reset()
1184 self.stream().write(line)
1186 self._lines += 1
1187 self._bytes += len(line)
1188 self._autoflush()
1190 @staticmethod
1191 def _truncate(line, max_length=_MAX_LINE_SIZE):
1192 """Truncates a potentially long log down to a specified maximum length."""
1193 if len(line) > max_length:
1194 original_length = len(line)
1195 suffix = '...(length %d)' % original_length
1196 line = line[:max_length - len(suffix)] + suffix
1197 return line
1199 def flush(self):
1200 """Flushes the contents of the logs buffer.
1202 This method holds the buffer lock until the API call has finished to ensure
1203 that flush calls are performed in the correct order, so that log messages
1204 written during the flush call aren't dropped or accidentally wiped, and so
1205 that the other buffer state variables (flush time, lines, bytes) are updated
1206 synchronously with the flush.
1208 with self._lock:
1209 self._flush()
1211 def _flush(self):
1212 """Internal version of flush() with no locking."""
1213 logs = self.parse_logs()
1214 self._clear()
1216 while True:
1217 group = log_service_pb.UserAppLogGroup()
1218 byte_size = 0
1219 n = 0
1220 for timestamp_usec, level, message in logs:
1223 message = self._truncate(message, LogsBufferOld._MAX_LINE_SIZE)
1225 if byte_size + len(message) > LogsBufferOld._MAX_FLUSH_SIZE:
1226 break
1227 line = group.add_log_line()
1228 line.set_timestamp_usec(timestamp_usec)
1229 line.set_level(level)
1230 line.set_message(message)
1231 byte_size += 1 + group.lengthString(line.ByteSize())
1232 n += 1
1233 assert n > 0 or not logs
1234 logs = logs[n:]
1236 request = log_service_pb.FlushRequest()
1237 request.set_logs(group.Encode())
1238 response = api_base_pb.VoidProto()
1239 apiproxy_stub_map.MakeSyncCall('logservice', 'Flush', request, response)
1240 if not logs:
1241 break
1243 def autoflush(self):
1244 """Flushes the buffer if certain conditions have been met."""
1245 with self._lock:
1246 return self._autoflush()
1248 def _autoflush(self):
1249 """Internal version of autoflush() with no locking."""
1250 if not self.autoflush_enabled():
1251 return
1253 if ((AUTOFLUSH_EVERY_SECONDS and self.age() >= AUTOFLUSH_EVERY_SECONDS) or
1254 (AUTOFLUSH_EVERY_LINES and self.lines() >= AUTOFLUSH_EVERY_LINES) or
1255 (AUTOFLUSH_EVERY_BYTES and self.bytes() >= AUTOFLUSH_EVERY_BYTES)):
1256 self._flush()
1258 def autoflush_enabled(self):
1259 """Indicates if the buffer will periodically flush logs during a request."""
1260 return AUTOFLUSH_ENABLED
1265 LogsBuffer = LogsBufferOld
1268 _global_buffer = LogsBuffer(stderr=True)