App Engine Python SDK version 1.9.13
[gae.git] / python / google / appengine / api / logservice / logservice_stub.py
blobb44d27e6de15c689ebb874d499b77b00a200daae
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
19 import atexit
20 import codecs
21 import logging
22 import time
24 import sqlite3
26 from google.appengine.api import apiproxy_stub
27 from google.appengine.api import appinfo
28 from google.appengine.api.logservice import log_service_pb
29 from google.appengine.runtime import apiproxy_errors
32 _REQUEST_LOG_CREATE = """
33 CREATE TABLE IF NOT EXISTS RequestLogs (
34 id INTEGER NOT NULL PRIMARY KEY,
35 user_request_id TEXT NOT NULL,
36 app_id TEXT NOT NULL,
37 version_id TEXT NOT NULL,
38 module TEXT NOT NULL,
39 ip TEXT NOT NULL,
40 nickname TEXT NOT NULL,
41 start_time INTEGER NOT NULL,
42 end_time INTEGER DEFAULT 0 NOT NULL,
43 method TEXT NOT NULL,
44 resource TEXT NOT NULL,
45 http_version TEXT NOT NULL,
46 status INTEGER DEFAULT 0 NOT NULL,
47 response_size INTEGER DEFAULT 0 NOT NULL,
48 user_agent TEXT NOT NULL,
49 url_map_entry TEXT DEFAULT '' NOT NULL,
50 host TEXT NOT NULL,
51 task_queue_name TEXT DEFAULT '' NOT NULL,
52 task_name TEXT DEFAULT '' NOT NULL,
53 latency INTEGER DEFAULT 0 NOT NULL,
54 mcycles INTEGER DEFAULT 0 NOT NULL,
55 finished INTEGER DEFAULT 0 NOT NULL
57 """
59 _REQUEST_LOG_ADD_MODULE_COLUMN = """
60 ALTER TABLE RequestLogs
61 ADD COLUMN module TEXT DEFAULT '%s' NOT NULL;
62 """ % appinfo.DEFAULT_MODULE
64 _APP_LOG_CREATE = """
65 CREATE TABLE IF NOT EXISTS AppLogs (
66 id INTEGER NOT NULL PRIMARY KEY,
67 request_id INTEGER NOT NULL,
68 timestamp INTEGER NOT NULL,
69 level INTEGER NOT NULL,
70 message TEXT NOT NULL,
71 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
73 """
76 class LogServiceStub(apiproxy_stub.APIProxyStub):
77 """Python stub for Log Service service."""
79 THREADSAFE = True
81 _ACCEPTS_REQUEST_ID = True
84 _DEFAULT_READ_COUNT = 20
87 _MIN_COMMIT_INTERVAL = 5
89 def __init__(self, persist=False, logs_path=None, request_data=None):
90 """Initializer.
92 Args:
93 persist: For backwards compatability. Has no effect.
94 logs_path: A str containing the filename to use for logs storage. Defaults
95 to in-memory if unset.
96 request_data: A apiproxy_stub.RequestData instance used to look up state
97 associated with the request that generated an API call.
98 """
100 super(LogServiceStub, self).__init__('logservice',
101 request_data=request_data)
102 self._request_id_to_request_row_id = {}
103 if logs_path is None:
104 logs_path = ':memory:'
105 self._conn = sqlite3.connect(logs_path, check_same_thread=False)
106 self._conn.row_factory = sqlite3.Row
107 self._conn.execute(_REQUEST_LOG_CREATE)
108 self._conn.execute(_APP_LOG_CREATE)
110 column_names = set(c['name'] for c in
111 self._conn.execute('PRAGMA table_info(RequestLogs)'))
112 if 'module' not in column_names:
113 self._conn.execute(_REQUEST_LOG_ADD_MODULE_COLUMN)
115 self._last_commit = time.time()
116 atexit.register(self._conn.commit)
118 @staticmethod
119 def _get_time_usec():
120 return int(time.time() * 1e6)
122 def _maybe_commit(self):
123 now = time.time()
124 if (now - self._last_commit) > self._MIN_COMMIT_INTERVAL:
125 self._conn.commit()
126 self._last_commit = now
128 @apiproxy_stub.Synchronized
129 def start_request(self, request_id, user_request_id, ip, app_id, version_id,
130 nickname, user_agent, host, method, resource, http_version,
131 start_time=None, module=None):
132 """Starts logging for a request.
134 Each start_request call must be followed by a corresponding end_request call
135 to cleanup resources allocated in start_request.
137 Args:
138 request_id: A unique string identifying the request associated with the
139 API call.
140 user_request_id: A user-visible unique string for retrieving the request
141 log at a later time.
142 ip: The user's IP address.
143 app_id: A string representing the application ID that this request
144 corresponds to.
145 version_id: A string representing the version ID that this request
146 corresponds to.
147 nickname: A string representing the user that has made this request (that
148 is, the user's nickname, e.g., 'foobar' for a user logged in as
149 'foobar@gmail.com').
150 user_agent: A string representing the agent used to make this request.
151 host: A string representing the host that received this request.
152 method: A string containing the HTTP method of this request.
153 resource: A string containing the path and query string of this request.
154 http_version: A string containing the HTTP version of this request.
155 start_time: An int containing the start time in micro-seconds. If unset,
156 the current time is used.
157 module: The string name of the module handling this request.
159 if module is None:
160 module = appinfo.DEFAULT_MODULE
161 if version_id is None:
162 version_id = 'NO-VERSION'
163 major_version_id = version_id.split('.', 1)[0]
164 if start_time is None:
165 start_time = self._get_time_usec()
166 cursor = self._conn.execute(
167 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
168 'nickname, user_agent, host, start_time, method, resource, '
169 'http_version, module)'
170 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
171 user_request_id, ip, app_id, major_version_id, nickname, user_agent,
172 host, start_time, method, resource, http_version, module))
173 self._request_id_to_request_row_id[request_id] = cursor.lastrowid
174 self._maybe_commit()
176 @apiproxy_stub.Synchronized
177 def end_request(self, request_id, status, response_size, end_time=None):
178 """Ends logging for a request.
180 Args:
181 request_id: A unique string identifying the request associated with the
182 API call.
183 status: An int containing the HTTP status code for this request.
184 response_size: An int containing the content length of the response.
185 end_time: An int containing the end time in micro-seconds. If unset, the
186 current time is used.
188 row_id = self._request_id_to_request_row_id.pop(request_id, None)
189 if not row_id:
190 return
191 if end_time is None:
192 end_time = self._get_time_usec()
193 self._conn.execute(
194 'UPDATE RequestLogs SET '
195 'status = ?, response_size = ?, end_time = ?, finished = 1 '
196 'WHERE id = ?', (
197 status, response_size, end_time, row_id))
198 self._maybe_commit()
200 def _Dynamic_Flush(self, request, unused_response, request_id):
201 """Writes application-level log messages for a request."""
202 group = log_service_pb.UserAppLogGroup(request.logs())
203 self._insert_app_logs(request_id, group.log_line_list())
205 @apiproxy_stub.Synchronized
206 def _insert_app_logs(self, request_id, log_lines):
207 row_id = self._request_id_to_request_row_id.get(request_id)
208 if row_id is None:
209 return
210 new_app_logs = (self._tuple_from_log_line(row_id, log_line)
211 for log_line in log_lines)
212 self._conn.executemany(
213 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
214 '(?, ?, ?, ?)', new_app_logs)
215 self._maybe_commit()
217 @staticmethod
218 def _tuple_from_log_line(row_id, log_line):
219 message = log_line.message()
220 if isinstance(message, str):
221 message = codecs.decode(message, 'utf-8', 'replace')
222 return (row_id, log_line.timestamp_usec(), log_line.level(), message)
224 @apiproxy_stub.Synchronized
225 def _Dynamic_Read(self, request, response, request_id):
226 if (request.module_version_size() < 1 and
227 request.version_id_size() < 1 and
228 request.request_id_size() < 1):
229 raise apiproxy_errors.ApplicationError(
230 log_service_pb.LogServiceError.INVALID_REQUEST)
232 if request.module_version_size() > 0 and request.version_id_size() > 0:
233 raise apiproxy_errors.ApplicationError(
234 log_service_pb.LogServiceError.INVALID_REQUEST)
236 if (request.request_id_size() and
237 (request.has_start_time() or request.has_end_time() or
238 request.has_offset())):
239 raise apiproxy_errors.ApplicationError(
240 log_service_pb.LogServiceError.INVALID_REQUEST)
242 if request.request_id_size():
243 for request_id in request.request_id_list():
244 log_row = self._conn.execute(
245 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
246 (request_id,)).fetchone()
247 if log_row:
248 log = response.add_log()
249 self._fill_request_log(log_row, log, request.include_app_logs())
250 return
252 if request.has_count():
253 count = request.count()
254 else:
255 count = self._DEFAULT_READ_COUNT
256 filters, values = self._extract_read_filters(request)
257 filter_string = ' WHERE %s' % ' and '.join(filters)
259 if request.has_minimum_log_level():
260 query = ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
261 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
262 'RequestLogs.id ORDER BY id DESC')
263 else:
264 query = 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
265 logs = self._conn.execute(query % filter_string,
266 values).fetchmany(count + 1)
267 if logging.getLogger(__name__).isEnabledFor(logging.DEBUG):
268 self._debug_query(filter_string, values, len(logs))
269 for log_row in logs[:count]:
270 log = response.add_log()
271 self._fill_request_log(log_row, log, request.include_app_logs())
272 if len(logs) > count:
273 response.mutable_offset().set_request_id(str(logs[-2]['id']))
275 def _debug_query(self, filter_string, values, result_count):
276 logging.debug('\n\n')
277 logging.debug(filter_string)
278 logging.debug(values)
279 logging.debug('%d results.', result_count)
280 logging.debug('DB dump:')
281 for l in self._conn.execute('SELECT * FROM RequestLogs'):
282 logging.debug('%r %r %d %d %s', l['module'], l['version_id'],
283 l['start_time'], l['end_time'],
284 l['finished'] and 'COMPLETE' or 'INCOMPLETE')
286 def _fill_request_log(self, log_row, log, include_app_logs):
287 log.set_request_id(str(log_row['user_request_id']))
288 log.set_app_id(log_row['app_id'])
289 log.set_version_id(log_row['version_id'])
290 log.set_module_id(log_row['module'])
291 log.set_ip(log_row['ip'])
292 log.set_nickname(log_row['nickname'])
293 log.set_start_time(log_row['start_time'])
294 log.set_host(log_row['host'])
295 log.set_end_time(log_row['end_time'])
296 log.set_method(log_row['method'])
297 log.set_resource(log_row['resource'])
298 log.set_status(log_row['status'])
299 log.set_response_size(log_row['response_size'])
300 log.set_http_version(log_row['http_version'])
301 log.set_user_agent(log_row['user_agent'])
302 log.set_url_map_entry(log_row['url_map_entry'])
303 log.set_latency(log_row['latency'])
304 log.set_mcycles(log_row['mcycles'])
305 log.set_finished(log_row['finished'])
306 log.mutable_offset().set_request_id(str(log_row['id']))
307 time_seconds = (log_row['end_time'] or log_row['start_time']) / 10**6
308 date_string = time.strftime('%d/%b/%Y:%H:%M:%S %z',
309 time.localtime(time_seconds))
310 log.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
311 (log_row['ip'], log_row['nickname'], date_string,
312 log_row['method'], log_row['resource'],
313 log_row['http_version'], log_row['status'] or 0,
314 log_row['response_size'] or 0, log_row['user_agent']))
315 if include_app_logs:
316 log_messages = self._conn.execute(
317 'SELECT timestamp, level, message FROM AppLogs '
318 'WHERE request_id = ?',
319 (log_row['id'],)).fetchall()
320 for message in log_messages:
321 line = log.add_line()
322 line.set_time(message['timestamp'])
323 line.set_level(message['level'])
324 line.set_log_message(message['message'])
326 @staticmethod
327 def _extract_read_filters(request):
328 """Extracts SQL filters from the LogReadRequest.
330 Args:
331 request: the incoming LogReadRequest.
332 Returns:
333 a pair of (filters, values); filters is a list of SQL filter expressions,
334 to be joined by AND operators; values is a list of values to be
335 interpolated into the filter expressions by the db library.
337 filters = []
338 values = []
340 module_filters = []
341 module_values = []
342 for module_version in request.module_version_list():
343 module_filters.append('(version_id = ? AND module = ?)')
344 module_values.append(module_version.version_id())
345 module = appinfo.DEFAULT_MODULE
346 if module_version.has_module_id():
347 module = module_version.module_id()
348 module_values.append(module)
349 if module_filters:
350 filters.append('(' + ' or '.join(module_filters) + ')')
351 values += module_values
353 if request.has_offset():
354 try:
355 filters.append('RequestLogs.id < ?')
356 values.append(int(request.offset().request_id()))
357 except ValueError:
358 logging.error('Bad offset in log request: "%s"', request.offset())
359 raise apiproxy_errors.ApplicationError(
360 log_service_pb.LogServiceError.INVALID_REQUEST)
361 if request.has_minimum_log_level():
362 filters.append('AppLogs.level >= ?')
363 values.append(request.minimum_log_level())
370 finished_filter = 'finished = 1 '
371 finished_filter_values = []
372 unfinished_filter = 'finished = 0'
373 unfinished_filter_values = []
375 if request.has_start_time():
376 finished_filter += ' and end_time >= ? '
377 finished_filter_values.append(request.start_time())
378 unfinished_filter += ' and start_time >= ? '
379 unfinished_filter_values.append(request.start_time())
380 if request.has_end_time():
381 finished_filter += ' and end_time < ? '
382 finished_filter_values.append(request.end_time())
383 unfinished_filter += ' and start_time < ? '
384 unfinished_filter_values.append(request.end_time())
386 if request.include_incomplete():
387 filters.append(
388 '((' + finished_filter + ') or (' + unfinished_filter + '))')
389 values += finished_filter_values + unfinished_filter_values
390 else:
391 filters.append(finished_filter)
392 values += finished_filter_values
394 return filters, values
396 def _Dynamic_SetStatus(self, unused_request, unused_response,
397 unused_request_id):
398 raise NotImplementedError
400 def _Dynamic_Usage(self, unused_request, unused_response, unused_request_id):
401 raise apiproxy_errors.CapabilityDisabledError('Usage not allowed in tests.')