App Engine Python SDK version 1.7.7
[gae.git] / python / google / appengine / api / logservice / logservice.py
blobbeb151b94335bf6ffb4b447a614cc6d0cb6eebff
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 """
22 LogService API.
24 This module allows apps to flush logs, provide status messages, and
25 programmatically access their request and application logs.
26 """
33 import base64
34 import cStringIO
35 import os
36 import re
37 import sys
38 import threading
39 import time
40 import warnings
42 from google.net.proto import ProtocolBuffer
43 from google.appengine.api import api_base_pb
44 from google.appengine.api import apiproxy_stub_map
45 from google.appengine.api.logservice import log_service_pb
46 from google.appengine.api.logservice import logsutil
47 from google.appengine.datastore import datastore_rpc
48 from google.appengine.runtime import apiproxy_errors
51 AUTOFLUSH_ENABLED = True
54 AUTOFLUSH_EVERY_SECONDS = 60
57 AUTOFLUSH_EVERY_BYTES = 4096
60 AUTOFLUSH_EVERY_LINES = 50
63 MAX_ITEMS_PER_FETCH = 1000
66 LOG_LEVEL_DEBUG = 0
67 LOG_LEVEL_INFO = 1
68 LOG_LEVEL_WARNING = 2
69 LOG_LEVEL_ERROR = 3
70 LOG_LEVEL_CRITICAL = 4
73 SERVER_ID_RE_STRING = r'(?!-)[a-z\d\-]{1,63}'
76 SERVER_VERSION_RE_STRING = r'(?!-)[a-z\d\-]{1,100}'
77 _MAJOR_VERSION_ID_PATTERN = r'^(?:(?:(%s):)?)(%s)$' % (SERVER_ID_RE_STRING,
78 SERVER_VERSION_RE_STRING)
80 _MAJOR_VERSION_ID_RE = re.compile(_MAJOR_VERSION_ID_PATTERN)
82 _REQUEST_ID_PATTERN = r'^[\da-fA-F]+$'
83 _REQUEST_ID_RE = re.compile(_REQUEST_ID_PATTERN)
86 class Error(Exception):
87 """Base error class for this module."""
90 class InvalidArgumentError(Error):
91 """Function argument has invalid value."""
94 class TimeoutError(Error):
95 """Requested timeout for fetch() call has expired while iterating results."""
97 def __init__(self, msg, offset, last_end_time):
98 Error.__init__(self, msg)
99 self.__offset = offset
100 self.__last_end_time = last_end_time
102 @property
103 def offset(self):
104 """Binary offset indicating the current position in the result stream.
106 May be submitted to future Log read requests to continue iterating logs
107 starting exactly where this iterator left off.
109 Returns:
110 A byte string representing an offset into the log stream, or None.
112 return self.__offset
114 @property
115 def last_end_time(self):
116 """End time of the last request examined prior to the timeout, or None.
118 Returns:
119 A float representing the completion time in seconds since the Unix
120 epoch of the last request examined.
122 return self.__last_end_time
125 class LogsBuffer(object):
126 """Threadsafe buffer for storing and periodically flushing app logs."""
128 _MAX_FLUSH_SIZE = int(1e6)
129 _MAX_LINE_SIZE = _MAX_FLUSH_SIZE
130 assert _MAX_LINE_SIZE <= _MAX_FLUSH_SIZE
132 def __init__(self, stream=None, stderr=False):
133 """Initializes the buffer, which wraps the given stream or sys.stderr.
135 The state of the LogsBuffer is protected by a separate lock. The lock is
136 acquired before any variables are mutated or accessed, and released
137 afterward. A recursive lock is used so that a single thread can acquire the
138 lock multiple times, and release it only when an identical number of
139 'unlock()' calls have been performed.
141 Args:
142 stream: A file-like object to store logs. Defaults to a cStringIO object.
143 stderr: If specified, use sys.stderr as the underlying stream.
145 self._stderr = stderr
146 if self._stderr:
147 assert stream is None
148 else:
149 self._stream = stream or cStringIO.StringIO()
150 self._lock = threading.RLock()
151 self._reset()
153 def _lock_and_call(self, method, *args):
154 """Calls 'method' while holding the buffer lock."""
155 self._lock.acquire()
156 try:
157 return method(*args)
158 finally:
159 self._lock.release()
161 def stream(self):
162 """Returns the underlying file-like object used to buffer logs."""
163 if self._stderr:
166 return sys.stderr
167 else:
168 return self._stream
170 def lines(self):
171 """Returns the number of log lines currently buffered."""
172 return self._lock_and_call(lambda: self._lines)
174 def bytes(self):
175 """Returns the size of the log buffer, in bytes."""
176 return self._lock_and_call(lambda: self._bytes)
178 def age(self):
179 """Returns the number of seconds since the log buffer was flushed."""
180 return self._lock_and_call(lambda: time.time() - self._flush_time)
182 def flush_time(self):
183 """Returns last time that the log buffer was flushed."""
184 return self._lock_and_call(lambda: self._flush_time)
186 def contents(self):
187 """Returns the contents of the logs buffer."""
188 return self._lock_and_call(self._contents)
190 def _contents(self):
191 """Internal version of contents() with no locking."""
192 try:
193 return self.stream().getvalue()
194 except AttributeError:
197 return ''
199 def reset(self):
200 """Resets the buffer state, without clearing the underlying stream."""
201 self._lock_and_call(self._reset)
203 def _reset(self):
204 """Internal version of reset() with no locking."""
205 contents = self._contents()
206 self._bytes = len(contents)
207 self._lines = len(contents.split('\n')) - 1
208 self._flush_time = time.time()
209 self._request = logsutil.RequestID()
211 def clear(self):
212 """Clears the contents of the logs buffer, and resets autoflush state."""
213 self._lock_and_call(self._clear)
215 def _clear(self):
216 """Internal version of clear() with no locking."""
217 if self._bytes > 0:
218 self.stream().truncate(0)
219 self._reset()
221 def close(self):
222 """Closes the underlying stream, flushing the current contents."""
223 self._lock_and_call(self._close)
225 def _close(self):
226 """Internal version of close() with no locking."""
227 self._flush()
228 self.stream().close()
230 def parse_logs(self):
231 """Parse the contents of the buffer and return an array of log lines."""
232 return logsutil.ParseLogs(self.contents())
234 def write(self, line):
235 """Writes a line to the logs buffer."""
236 return self._lock_and_call(self._write, line)
238 def writelines(self, seq):
239 """Writes each line in the given sequence to the logs buffer."""
240 for line in seq:
241 self.write(line)
243 def _write(self, line):
244 """Writes a line to the logs buffer."""
245 if self._request != logsutil.RequestID():
248 self._reset()
249 self.stream().write(line)
251 self._lines += 1
252 self._bytes += len(line)
253 self._autoflush()
255 @staticmethod
256 def _truncate(line, max_length=_MAX_LINE_SIZE):
257 """Truncates a potentially long log down to a specified maximum length."""
258 if len(line) > max_length:
259 original_length = len(line)
260 suffix = '...(length %d)' % original_length
261 line = line[:max_length - len(suffix)] + suffix
262 return line
264 def flush(self):
265 """Flushes the contents of the logs buffer.
267 This method holds the buffer lock until the API call has finished to ensure
268 that flush calls are performed in the correct order, so that log messages
269 written during the flush call aren't dropped or accidentally wiped, and so
270 that the other buffer state variables (flush time, lines, bytes) are updated
271 synchronously with the flush.
273 self._lock_and_call(self._flush)
275 def _flush(self):
276 """Internal version of flush() with no locking."""
277 logs = self.parse_logs()
278 self._clear()
280 first_iteration = True
281 while logs or first_iteration:
282 first_iteration = False
283 request = log_service_pb.FlushRequest()
284 group = log_service_pb.UserAppLogGroup()
285 byte_size = 0
286 n = 0
287 for entry in logs:
290 if len(entry[2]) > LogsBuffer._MAX_LINE_SIZE:
291 entry = list(entry)
292 entry[2] = self._truncate(entry[2], LogsBuffer._MAX_LINE_SIZE)
295 if byte_size + len(entry[2]) > LogsBuffer._MAX_FLUSH_SIZE:
296 break
297 line = group.add_log_line()
298 line.set_timestamp_usec(entry[0])
299 line.set_level(entry[1])
300 line.set_message(entry[2])
301 byte_size += 1 + group.lengthString(line.ByteSize())
302 n += 1
303 assert n > 0 or not logs
304 logs = logs[n:]
305 request.set_logs(group.Encode())
306 response = api_base_pb.VoidProto()
307 apiproxy_stub_map.MakeSyncCall('logservice', 'Flush', request, response)
309 def autoflush(self):
310 """Flushes the buffer if certain conditions have been met."""
311 self._lock_and_call(self._autoflush)
313 def _autoflush(self):
314 """Internal version of autoflush() with no locking."""
315 if not self.autoflush_enabled():
316 return
318 if ((AUTOFLUSH_EVERY_SECONDS and self.age() >= AUTOFLUSH_EVERY_SECONDS) or
319 (AUTOFLUSH_EVERY_LINES and self.lines() >= AUTOFLUSH_EVERY_LINES) or
320 (AUTOFLUSH_EVERY_BYTES and self.bytes() >= AUTOFLUSH_EVERY_BYTES)):
321 self._flush()
323 def autoflush_enabled(self):
324 """Indicates if the buffer will periodically flush logs during a request."""
325 return AUTOFLUSH_ENABLED
329 _global_buffer = LogsBuffer(stderr=True)
332 def logs_buffer():
333 """Returns the LogsBuffer used by the current request."""
338 return _global_buffer
341 def write(message):
342 """Adds 'message' to the logs buffer, and checks for autoflush.
344 Args:
345 message: A message (string) to be written to application logs.
347 logs_buffer().write(message)
350 def clear():
351 """Clear the logs buffer and reset the autoflush state."""
352 logs_buffer().clear()
355 def autoflush():
356 """If AUTOFLUSH conditions have been met, performs a Flush API call."""
357 logs_buffer().autoflush()
360 def flush():
361 """Flushes log lines that are currently buffered."""
362 logs_buffer().flush()
365 def flush_time():
366 """Returns last time that the logs buffer was flushed."""
367 return logs_buffer().flush_time()
370 def log_buffer_age():
371 """Returns the number of seconds since the logs buffer was flushed."""
372 return logs_buffer().age()
375 def log_buffer_contents():
376 """Returns the contents of the logs buffer."""
377 return logs_buffer().contents()
380 def log_buffer_bytes():
381 """Returns the size of the logs buffer, in bytes."""
382 return logs_buffer().bytes()
385 def log_buffer_lines():
386 """Returns the number of log lines currently buffered."""
387 return logs_buffer().lines()
390 class _LogQueryResult(object):
391 """A container that holds a log request and provides an iterator to read logs.
393 A _LogQueryResult object is the standard returned item for a call to fetch().
394 It is iterable - each value returned is a log that the user has queried for,
395 and internally, it holds a cursor that it uses to fetch more results once the
396 current, locally held set, are exhausted.
398 Properties:
399 _request: A LogReadRequest that contains the parameters the user has set for
400 the initial fetch call, which will be updated with a more current cursor
401 if more logs are requested.
402 _logs: A list of RequestLogs corresponding to logs the user has asked for.
403 _read_called: A boolean that indicates if a Read call has even been made
404 with the request stored in this object.
407 def __init__(self, request, timeout=None):
408 """Constructor.
410 Args:
411 request: A LogReadRequest object that will be used for Read calls.
413 self._request = request
414 self._logs = []
415 self._read_called = False
416 self._last_end_time = None
417 self._end_time = None
418 if timeout is not None:
419 self._end_time = time.time() + timeout
421 def __iter__(self):
422 """Provides an iterator that yields log records one at a time."""
423 while True:
424 for log_item in self._logs:
425 yield RequestLog(log_item)
426 if not self._read_called or self._request.has_offset():
427 if self._end_time and time.time() >= self._end_time:
428 offset = None
429 if self._request.has_offset():
430 offset = self._request.offset().Encode()
431 raise TimeoutError('A timeout occurred while iterating results',
432 offset=offset, last_end_time=self._last_end_time)
433 self._read_called = True
434 self._advance()
435 else:
436 break
438 def _advance(self):
439 """Acquires additional logs via cursor.
441 This method is used by the iterator when it has exhausted its current set of
442 logs to acquire more logs and update its internal structures accordingly.
444 response = log_service_pb.LogReadResponse()
446 try:
447 apiproxy_stub_map.MakeSyncCall('logservice', 'Read', self._request,
448 response)
449 except apiproxy_errors.ApplicationError, e:
450 if e.application_error == log_service_pb.LogServiceError.INVALID_REQUEST:
451 raise InvalidArgumentError(e.error_detail)
452 raise Error(e.error_detail)
454 self._logs = response.log_list()
455 self._request.clear_offset()
456 if response.has_offset():
457 self._request.mutable_offset().CopyFrom(response.offset())
458 self._last_end_time = None
459 if response.has_last_end_time():
460 self._last_end_time = response.last_end_time() / 1e6
463 class RequestLog(object):
464 """Complete log information about a single request to an application."""
466 def __init__(self, request_log=None):
467 if type(request_log) is str:
468 self.__pb = log_service_pb.RequestLog(base64.b64decode(request_log))
469 elif request_log.__class__ == log_service_pb.RequestLog:
470 self.__pb = request_log
471 else:
472 self.__pb = log_service_pb.RequestLog()
473 self.__lines = []
475 def __repr__(self):
476 return 'RequestLog(\'%s\')' % base64.b64encode(self.__pb.Encode())
478 def __str__(self):
479 if self.server_id == 'default':
480 return ('<RequestLog(app_id=%s, version_id=%s, request_id=%s)>' %
481 (self.app_id, self.version_id, base64.b64encode(self.request_id)))
482 else:
483 return ('<RequestLog(app_id=%s, server_id=%s, version_id=%s, '
484 'request_id=%s)>' %
485 (self.app_id, self.server_id, self.version_id,
486 base64.b64encode(self.request_id)))
488 @property
489 def _pb(self):
490 return self.__pb
492 @property
493 def app_id(self):
494 """Application id that handled this request, as a string."""
495 return self.__pb.app_id()
497 @property
498 def server_id(self):
499 """Server id that handled this request, as a string."""
500 return self.__pb.server_id()
502 @property
503 def version_id(self):
504 """Version of the application that handled this request, as a string."""
505 return self.__pb.version_id()
507 @property
508 def request_id(self):
509 """Globally unique identifier for a request, based on request start time.
511 Request ids for requests which started later will compare greater as
512 binary strings than those for requests which started earlier.
514 Returns:
515 A byte string containing a unique identifier for this request.
517 return self.__pb.request_id()
519 @property
520 def offset(self):
521 """Binary offset indicating current position in the result stream.
523 May be submitted to future Log read requests to continue immediately after
524 this request.
526 Returns:
527 A byte string representing an offset into the active result stream.
529 if self.__pb.has_offset():
530 return self.__pb.offset().Encode()
531 return None
533 @property
534 def ip(self):
535 """The origin IP address of the request, as a string."""
536 return self.__pb.ip()
538 @property
539 def nickname(self):
540 """Nickname of the user that made the request if known and logged in.
542 Returns:
543 A string representation of the logged in user's nickname, or None.
545 if self.__pb.has_nickname():
546 return self.__pb.nickname()
547 return None
549 @property
550 def start_time(self):
551 """Time at which request was known to have begun processing.
553 Returns:
554 A float representing the time this request began processing in seconds
555 since the Unix epoch.
557 return self.__pb.start_time() / 1e6
559 @property
560 def end_time(self):
561 """Time at which request was known to have completed.
563 Returns:
564 A float representing the request completion time in seconds since the
565 Unix epoch.
567 return self.__pb.end_time() / 1e6
569 @property
570 def latency(self):
571 """Time required to process request in seconds, as a float."""
572 return self.__pb.latency() / 1e6
574 @property
575 def mcycles(self):
576 """Number of machine cycles used to process request, as an integer."""
577 return self.__pb.mcycles()
579 @property
580 def method(self):
581 """Request method (GET, PUT, POST, etc), as a string."""
582 return self.__pb.method()
584 @property
585 def resource(self):
586 """Resource path on server requested by client.
588 For example, http://nowhere.com/app would have a resource string of '/app'.
590 Returns:
591 A string containing the path component of the request URL.
593 return self.__pb.resource()
595 @property
596 def http_version(self):
597 """HTTP version of request, as a string."""
598 return self.__pb.http_version()
600 @property
601 def status(self):
602 """Response status of request, as an int."""
603 return self.__pb.status()
605 @property
606 def response_size(self):
607 """Size in bytes sent back to client by request, as a long."""
608 return self.__pb.response_size()
610 @property
611 def referrer(self):
612 """Referrer URL of request as a string, or None."""
613 if self.__pb.has_referrer():
614 return self.__pb.referrer()
615 return None
617 @property
618 def user_agent(self):
619 """User agent used to make the request as a string, or None."""
620 if self.__pb.has_user_agent():
621 return self.__pb.user_agent()
622 return None
624 @property
625 def url_map_entry(self):
626 """File or class within URL mapping used for request.
628 Useful for tracking down the source code which was responsible for managing
629 request, especially for multiply mapped handlers.
631 Returns:
632 A string containing a file or class name.
634 return self.__pb.url_map_entry()
636 @property
637 def combined(self):
638 """Apache combined log entry for request.
640 The information in this field can be constructed from the rest of
641 this message, however, this field is included for convenience.
643 Returns:
644 A string containing an Apache-style log line in the form documented at
645 http://httpd.apache.org/docs/1.3/logs.html.
647 return self.__pb.combined()
649 @property
650 def api_mcycles(self):
651 """Number of machine cycles spent in API calls while processing request.
653 Deprecated. This value is no longer meaningful.
655 Returns:
656 Number of API machine cycles used as a long, or None if not available.
658 warnings.warn('api_mcycles does not return a meaningful value.',
659 DeprecationWarning, stacklevel=2)
660 if self.__pb.has_api_mcycles():
661 return self.__pb.api_mcycles()
662 return None
664 @property
665 def host(self):
666 """The Internet host and port number of the resource being requested.
668 Returns:
669 A string representing the host and port receiving the request, or None
670 if not available.
672 if self.__pb.has_host():
673 return self.__pb.host()
674 return None
676 @property
677 def cost(self):
678 """The estimated cost of this request, in fractional dollars.
680 Returns:
681 A float representing an estimated fractional dollar cost of this
682 request, or None if not available.
684 if self.__pb.has_cost():
685 return self.__pb.cost()
686 return None
688 @property
689 def task_queue_name(self):
690 """The request's queue name, if generated via the Task Queue API.
692 Returns:
693 A string containing the request's queue name if relevant, or None.
695 if self.__pb.has_task_queue_name():
696 return self.__pb.task_queue_name()
697 return None
699 @property
700 def task_name(self):
701 """The request's task name, if generated via the Task Queue API.
703 Returns:
704 A string containing the request's task name if relevant, or None.
706 if self.__pb.has_task_name():
707 return self.__pb.task_name()
709 @property
710 def was_loading_request(self):
711 """Returns whether this request was a loading request for an instance.
713 Returns:
714 A bool indicating whether this request was a loading request.
716 return bool(self.__pb.was_loading_request())
718 @property
719 def pending_time(self):
720 """Time this request spent in the pending request queue.
722 Returns:
723 A float representing the time in seconds that this request was pending.
725 return self.__pb.pending_time() / 1e6
727 @property
728 def replica_index(self):
729 """The server replica that handled the request as an integer, or None."""
730 if self.__pb.has_replica_index():
731 return self.__pb.replica_index()
732 return None
734 @property
735 def finished(self):
736 """Whether or not this log represents a finished request, as a bool."""
737 return bool(self.__pb.finished())
739 @property
740 def instance_key(self):
741 """Mostly-unique identifier for the instance that handled the request.
743 Returns:
744 A string encoding of an instance key if available, or None.
746 if self.__pb.has_clone_key():
747 return self.__pb.clone_key()
748 return None
750 @property
751 def app_logs(self):
752 """Logs emitted by the application while serving this request.
754 Returns:
755 A list of AppLog objects representing the log lines for this request, or
756 an empty list if none were emitted or the query did not request them.
758 if not self.__lines and self.__pb.line_size():
759 self.__lines = [AppLog(time=line.time() / 1e6, level=line.level(),
760 message=line.log_message())
761 for line in self.__pb.line_list()]
762 return self.__lines
764 @property
765 def app_engine_release(self):
766 """App Engine Infrastructure release that served this request.
768 Returns:
769 A string containing App Engine version that served this request, or None
770 if not available.
772 if self.__pb.has_app_engine_release():
773 return self.__pb.app_engine_release()
774 return None
777 class AppLog(object):
778 """Application log line emitted while processing a request."""
780 def __init__(self, time=None, level=None, message=None):
781 self._time = time
782 self._level = level
783 self._message = message
785 def __eq__(self, other):
786 return (self.time == other.time and self.level and other.level and
787 self.message == other.message)
789 def __repr__(self):
790 return ('AppLog(time=%f, level=%d, message=\'%s\')' %
791 (self.time, self.level, self.message))
793 @property
794 def time(self):
795 """Time log entry was made, in seconds since the Unix epoch, as a float."""
796 return self._time
798 @property
799 def level(self):
800 """Level or severity of log, as an int."""
801 return self._level
803 @property
804 def message(self):
805 """Application-provided log message, as a string."""
806 return self._message
809 _FETCH_KWARGS = frozenset(['prototype_request', 'timeout', 'batch_size'])
812 @datastore_rpc._positional(0)
813 def fetch(start_time=None,
814 end_time=None,
815 offset=None,
816 minimum_log_level=None,
817 include_incomplete=False,
818 include_app_logs=False,
819 server_versions=None,
820 version_ids=None,
821 request_ids=None,
822 **kwargs):
823 """Returns an iterator yielding an application's request and application logs.
825 Logs will be returned by the iterator in reverse chronological order by
826 request end time, or by last flush time for requests still in progress (if
827 requested). The items yielded are RequestLog objects, the contents of which
828 are accessible via method calls.
830 All parameters are optional.
832 Args:
833 start_time: The earliest request completion or last-update time that
834 results should be fetched for, in seconds since the Unix epoch.
835 end_time: The latest request completion or last-update time that
836 results should be fetched for, in seconds since the Unix epoch.
837 offset: A byte string representing an offset into the log stream, extracted
838 from a previously emitted RequestLog. This iterator will begin
839 immediately after the record from which the offset came.
840 minimum_log_level: An application log level which serves as a filter on the
841 requests returned--requests with no application log at or above the
842 specified level will be omitted. Works even if include_app_logs is not
843 True. In ascending order, the available log levels are:
844 logservice.LOG_LEVEL_DEBUG, logservice.LOG_LEVEL_INFO,
845 logservice.LOG_LEVEL_WARNING, logservice.LOG_LEVEL_ERROR,
846 and logservice.LOG_LEVEL_CRITICAL.
847 include_incomplete: Whether or not to include requests that have started but
848 not yet finished, as a boolean. Defaults to False.
849 include_app_logs: Whether or not to include application level logs in the
850 results, as a boolean. Defaults to False.
851 server_versions: A list of tuples of the form (server, version), that
852 indicate that the logs for the given server/version combination should be
853 fetched. Duplicate tuples will be ignored. This kwarg may not be used
854 in conjunction with the 'version_ids' kwarg.
855 version_ids: A list of version ids whose logs should be queried against.
856 Defaults to the application's current version id only. This kwarg may not
857 be used in conjunction with the 'server_versions' kwarg.
858 request_ids: If not None, indicates that instead of a time-based scan, logs
859 for the specified requests should be returned. Malformed request IDs will
860 cause the entire request to be rejected, while any requests that are
861 unknown will be ignored. This option may not be combined with any
862 filtering options such as start_time, end_time, offset, or
863 minimum_log_level. version_ids is ignored. IDs that do not correspond to
864 a request log will be ignored. Logs will be returned in the order
865 requested.
867 Returns:
868 An iterable object containing the logs that the user has queried for.
870 Raises:
871 InvalidArgumentError: Raised if any of the input parameters are not of the
872 correct type.
875 args_diff = set(kwargs) - _FETCH_KWARGS
876 if args_diff:
877 raise InvalidArgumentError('Invalid arguments: %s' % ', '.join(args_diff))
879 request = log_service_pb.LogReadRequest()
881 request.set_app_id(os.environ['APPLICATION_ID'])
883 if start_time is not None:
884 if not isinstance(start_time, (float, int, long)):
885 raise InvalidArgumentError('start_time must be a float or integer')
886 request.set_start_time(long(start_time * 1000000))
888 if end_time is not None:
889 if not isinstance(end_time, (float, int, long)):
890 raise InvalidArgumentError('end_time must be a float or integer')
891 request.set_end_time(long(end_time * 1000000))
893 if offset is not None:
894 try:
895 request.mutable_offset().ParseFromString(offset)
896 except (TypeError, ProtocolBuffer.ProtocolBufferDecodeError):
897 raise InvalidArgumentError('offset must be a string or read-only buffer')
899 if minimum_log_level is not None:
900 if not isinstance(minimum_log_level, int):
901 raise InvalidArgumentError('minimum_log_level must be an int')
903 if not minimum_log_level in range(LOG_LEVEL_CRITICAL+1):
904 raise InvalidArgumentError("""minimum_log_level must be between 0 and 4
905 inclusive""")
906 request.set_minimum_log_level(minimum_log_level)
908 if not isinstance(include_incomplete, bool):
909 raise InvalidArgumentError('include_incomplete must be a boolean')
910 request.set_include_incomplete(include_incomplete)
912 if not isinstance(include_app_logs, bool):
913 raise InvalidArgumentError('include_app_logs must be a boolean')
914 request.set_include_app_logs(include_app_logs)
916 if version_ids and server_versions:
917 raise InvalidArgumentError('version_ids and server_versions may not be '
918 'used at the same time.')
920 if version_ids is None and server_versions is None:
923 version_id = os.environ['CURRENT_VERSION_ID']
924 request.add_server_version().set_version_id(version_id.split('.')[0])
926 if server_versions:
927 if not isinstance(server_versions, list):
928 raise InvalidArgumentError('server_versions must be a list')
930 req_server_versions = set()
931 for entry in server_versions:
932 if not isinstance(entry, (list, tuple)):
933 raise InvalidArgumentError('server_versions list entries must all be '
934 'tuples or lists.')
935 if len(entry) != 2:
936 raise InvalidArgumentError('server_versions list entries must all be '
937 'of length 2.')
938 req_server_versions.add((entry[0], entry[1]))
940 for server, version in sorted(req_server_versions):
941 req_server_version = request.add_server_version()
944 if server != 'default':
945 req_server_version.set_server_id(server)
946 req_server_version.set_version_id(version)
948 if version_ids:
949 if not isinstance(version_ids, list):
950 raise InvalidArgumentError('version_ids must be a list')
951 for version_id in version_ids:
952 if not _MAJOR_VERSION_ID_RE.match(version_id):
953 raise InvalidArgumentError(
954 'version_ids must only contain valid major version identifiers')
955 request.add_server_version().set_version_id(version_id)
957 if request_ids is not None:
958 if not isinstance(request_ids, list):
959 raise InvalidArgumentError('request_ids must be a list')
960 if not request_ids:
961 raise InvalidArgumentError('request_ids must not be empty')
962 if len(request_ids) != len(set(request_ids)):
963 raise InvalidArgumentError('request_ids must not contain duplicates')
964 for request_id in request_ids:
965 if not _REQUEST_ID_RE.match(request_id):
966 raise InvalidArgumentError(
967 '%s is not a valid request log id' % request_id)
968 request.request_id_list()[:] = request_ids
970 prototype_request = kwargs.get('prototype_request')
971 if prototype_request:
972 if not isinstance(prototype_request, log_service_pb.LogReadRequest):
973 raise InvalidArgumentError('prototype_request must be a LogReadRequest')
974 request.MergeFrom(prototype_request)
976 timeout = kwargs.get('timeout')
977 if timeout is not None:
978 if not isinstance(timeout, (float, int, long)):
979 raise InvalidArgumentError('timeout must be a float or integer')
981 batch_size = kwargs.get('batch_size')
982 if batch_size is not None:
983 if not isinstance(batch_size, (int, long)):
984 raise InvalidArgumentError('batch_size must be an integer')
986 if batch_size < 1:
987 raise InvalidArgumentError('batch_size must be greater than zero')
989 if batch_size > MAX_ITEMS_PER_FETCH:
990 raise InvalidArgumentError('batch_size specified is too large')
991 request.set_count(batch_size)
993 return _LogQueryResult(request, timeout=timeout)