App Engine Python SDK version 1.8.9
[gae.git] / python / google / appengine / api / logservice / logservice_stub.py
blob1fbe9d91da6e035ceb8bc4f143a2dfd510803bb5
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
19 import atexit
20 import codecs
21 import logging
22 import time
24 import sqlite3
26 from google.appengine.api import apiproxy_stub
27 from google.appengine.api import appinfo
28 from google.appengine.api.logservice import log_service_pb
29 from google.appengine.runtime import apiproxy_errors
32 _REQUEST_LOG_CREATE = """
33 CREATE TABLE IF NOT EXISTS RequestLogs (
34 id INTEGER NOT NULL PRIMARY KEY,
35 user_request_id TEXT NOT NULL,
36 app_id TEXT NOT NULL,
37 version_id TEXT NOT NULL,
38 module TEXT NOT NULL,
39 ip TEXT NOT NULL,
40 nickname TEXT NOT NULL,
41 start_time INTEGER NOT NULL,
42 end_time INTEGER DEFAULT 0 NOT NULL,
43 method TEXT NOT NULL,
44 resource TEXT NOT NULL,
45 http_version TEXT NOT NULL,
46 status INTEGER DEFAULT 0 NOT NULL,
47 response_size INTEGER DEFAULT 0 NOT NULL,
48 user_agent TEXT NOT NULL,
49 url_map_entry TEXT DEFAULT '' NOT NULL,
50 host TEXT NOT NULL,
51 task_queue_name TEXT DEFAULT '' NOT NULL,
52 task_name TEXT DEFAULT '' NOT NULL,
53 latency INTEGER DEFAULT 0 NOT NULL,
54 mcycles INTEGER DEFAULT 0 NOT NULL,
55 finished INTEGER DEFAULT 0 NOT NULL
57 """
59 _REQUEST_LOG_ADD_MODULE_COLUMN = """
60 ALTER TABLE RequestLogs
61 ADD COLUMN module TEXT DEFAULT '%s' NOT NULL;
62 """ % appinfo.DEFAULT_MODULE
64 _APP_LOG_CREATE = """
65 CREATE TABLE IF NOT EXISTS AppLogs (
66 id INTEGER NOT NULL PRIMARY KEY,
67 request_id INTEGER NOT NULL,
68 timestamp INTEGER NOT NULL,
69 level INTEGER NOT NULL,
70 message TEXT NOT NULL,
71 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
73 """
76 class LogServiceStub(apiproxy_stub.APIProxyStub):
77 """Python stub for Log Service service."""
79 THREADSAFE = True
81 _ACCEPTS_REQUEST_ID = True
84 _DEFAULT_READ_COUNT = 20
87 _MIN_COMMIT_INTERVAL = 5
89 def __init__(self, persist=False, logs_path=None, request_data=None):
90 """Initializer.
92 Args:
93 persist: For backwards compatability. Has no effect.
94 logs_path: A str containing the filename to use for logs storage. Defaults
95 to in-memory if unset.
96 request_data: A apiproxy_stub.RequestData instance used to look up state
97 associated with the request that generated an API call.
98 """
100 super(LogServiceStub, self).__init__('logservice',
101 request_data=request_data)
102 self._request_id_to_request_row_id = {}
103 if logs_path is None:
104 logs_path = ':memory:'
105 self._conn = sqlite3.connect(logs_path, check_same_thread=False)
106 self._conn.row_factory = sqlite3.Row
107 self._conn.execute(_REQUEST_LOG_CREATE)
108 self._conn.execute(_APP_LOG_CREATE)
110 column_names = set(c['name'] for c in
111 self._conn.execute('PRAGMA table_info(RequestLogs)'))
112 if 'module' not in column_names:
113 self._conn.execute(_REQUEST_LOG_ADD_MODULE_COLUMN)
115 self._last_commit = time.time()
116 atexit.register(self._conn.commit)
118 @staticmethod
119 def _get_time_usec():
120 return int(time.time() * 1e6)
122 def _maybe_commit(self):
123 now = time.time()
124 if (now - self._last_commit) > self._MIN_COMMIT_INTERVAL:
125 self._conn.commit()
126 self._last_commit = now
128 @apiproxy_stub.Synchronized
129 def start_request(self, request_id, user_request_id, ip, app_id, version_id,
130 nickname, user_agent, host, method, resource, http_version,
131 start_time=None, module=None):
132 """Starts logging for a request.
134 Each start_request call must be followed by a corresponding end_request call
135 to cleanup resources allocated in start_request.
137 Args:
138 request_id: A unique string identifying the request associated with the
139 API call.
140 user_request_id: A user-visible unique string for retrieving the request
141 log at a later time.
142 ip: The user's IP address.
143 app_id: A string representing the application ID that this request
144 corresponds to.
145 version_id: A string representing the version ID that this request
146 corresponds to.
147 nickname: A string representing the user that has made this request (that
148 is, the user's nickname, e.g., 'foobar' for a user logged in as
149 'foobar@gmail.com').
150 user_agent: A string representing the agent used to make this request.
151 host: A string representing the host that received this request.
152 method: A string containing the HTTP method of this request.
153 resource: A string containing the path and query string of this request.
154 http_version: A string containing the HTTP version of this request.
155 start_time: An int containing the start time in micro-seconds. If unset,
156 the current time is used.
157 module: The string name of the module handling this request.
159 if module is None:
160 module = appinfo.DEFAULT_MODULE
161 major_version_id = version_id.split('.', 1)[0]
162 if start_time is None:
163 start_time = self._get_time_usec()
164 cursor = self._conn.execute(
165 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
166 'nickname, user_agent, host, start_time, method, resource, '
167 'http_version, module)'
168 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
169 user_request_id, ip, app_id, major_version_id, nickname, user_agent,
170 host, start_time, method, resource, http_version, module))
171 self._request_id_to_request_row_id[request_id] = cursor.lastrowid
172 self._maybe_commit()
174 @apiproxy_stub.Synchronized
175 def end_request(self, request_id, status, response_size, end_time=None):
176 """Ends logging for a request.
178 Args:
179 request_id: A unique string identifying the request associated with the
180 API call.
181 status: An int containing the HTTP status code for this request.
182 response_size: An int containing the content length of the response.
183 end_time: An int containing the end time in micro-seconds. If unset, the
184 current time is used.
186 row_id = self._request_id_to_request_row_id.pop(request_id, None)
187 if not row_id:
188 return
189 if end_time is None:
190 end_time = self._get_time_usec()
191 self._conn.execute(
192 'UPDATE RequestLogs SET '
193 'status = ?, response_size = ?, end_time = ?, finished = 1 '
194 'WHERE id = ?', (
195 status, response_size, end_time, row_id))
196 self._maybe_commit()
198 def _Dynamic_Flush(self, request, unused_response, request_id):
199 """Writes application-level log messages for a request."""
200 group = log_service_pb.UserAppLogGroup(request.logs())
201 self._insert_app_logs(request_id, group.log_line_list())
203 @apiproxy_stub.Synchronized
204 def _insert_app_logs(self, request_id, log_lines):
205 row_id = self._request_id_to_request_row_id.get(request_id)
206 if row_id is None:
207 return
208 new_app_logs = (self._tuple_from_log_line(row_id, log_line)
209 for log_line in log_lines)
210 self._conn.executemany(
211 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
212 '(?, ?, ?, ?)', new_app_logs)
213 self._maybe_commit()
215 @staticmethod
216 def _tuple_from_log_line(row_id, log_line):
217 message = log_line.message()
218 if isinstance(message, str):
219 message = codecs.decode(message, 'utf-8', 'replace')
220 return (row_id, log_line.timestamp_usec(), log_line.level(), message)
222 @apiproxy_stub.Synchronized
223 def _Dynamic_Read(self, request, response, request_id):
224 if (request.module_version_size() < 1 and
225 request.version_id_size() < 1 and
226 request.request_id_size() < 1):
227 raise apiproxy_errors.ApplicationError(
228 log_service_pb.LogServiceError.INVALID_REQUEST)
230 if request.module_version_size() > 0 and request.version_id_size() > 0:
231 raise apiproxy_errors.ApplicationError(
232 log_service_pb.LogServiceError.INVALID_REQUEST)
234 if (request.request_id_size() and
235 (request.has_start_time() or request.has_end_time() or
236 request.has_offset())):
237 raise apiproxy_errors.ApplicationError(
238 log_service_pb.LogServiceError.INVALID_REQUEST)
240 if request.request_id_size():
241 for request_id in request.request_id_list():
242 log_row = self._conn.execute(
243 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
244 (request_id,)).fetchone()
245 if log_row:
246 log = response.add_log()
247 self._fill_request_log(log_row, log, request.include_app_logs())
248 return
250 if request.has_count():
251 count = request.count()
252 else:
253 count = self._DEFAULT_READ_COUNT
254 filters, values = self._extract_read_filters(request)
255 filter_string = ' WHERE %s' % ' and '.join(filters)
257 if request.has_minimum_log_level():
258 query = ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
259 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
260 'RequestLogs.id ORDER BY id DESC')
261 else:
262 query = 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
263 logs = self._conn.execute(query % filter_string,
264 values).fetchmany(count + 1)
265 if logging.getLogger(__name__).isEnabledFor(logging.DEBUG):
266 self._debug_query(filter_string, values, len(logs))
267 for log_row in logs[:count]:
268 log = response.add_log()
269 self._fill_request_log(log_row, log, request.include_app_logs())
270 if len(logs) > count:
271 response.mutable_offset().set_request_id(str(logs[-2]['id']))
273 def _debug_query(self, filter_string, values, result_count):
274 logging.debug('\n\n')
275 logging.debug(filter_string)
276 logging.debug(values)
277 logging.debug('%d results.', result_count)
278 logging.debug('DB dump:')
279 for l in self._conn.execute('SELECT * FROM RequestLogs'):
280 logging.debug('%r %r %d %d %s', l['module'], l['version_id'],
281 l['start_time'], l['end_time'],
282 l['finished'] and 'COMPLETE' or 'INCOMPLETE')
284 def _fill_request_log(self, log_row, log, include_app_logs):
285 log.set_request_id(str(log_row['user_request_id']))
286 log.set_app_id(log_row['app_id'])
287 log.set_version_id(log_row['version_id'])
288 log.set_module_id(log_row['module'])
289 log.set_ip(log_row['ip'])
290 log.set_nickname(log_row['nickname'])
291 log.set_start_time(log_row['start_time'])
292 log.set_host(log_row['host'])
293 log.set_end_time(log_row['end_time'])
294 log.set_method(log_row['method'])
295 log.set_resource(log_row['resource'])
296 log.set_status(log_row['status'])
297 log.set_response_size(log_row['response_size'])
298 log.set_http_version(log_row['http_version'])
299 log.set_user_agent(log_row['user_agent'])
300 log.set_url_map_entry(log_row['url_map_entry'])
301 log.set_latency(log_row['latency'])
302 log.set_mcycles(log_row['mcycles'])
303 log.set_finished(log_row['finished'])
304 log.mutable_offset().set_request_id(str(log_row['id']))
305 time_seconds = (log_row['end_time'] or log_row['start_time']) / 10**6
306 date_string = time.strftime('%d/%b/%Y:%H:%M:%S %z',
307 time.localtime(time_seconds))
308 log.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
309 (log_row['ip'], log_row['nickname'], date_string,
310 log_row['method'], log_row['resource'],
311 log_row['http_version'], log_row['status'] or 0,
312 log_row['response_size'] or 0, log_row['user_agent']))
313 if include_app_logs:
314 log_messages = self._conn.execute(
315 'SELECT timestamp, level, message FROM AppLogs '
316 'WHERE request_id = ?',
317 (log_row['id'],)).fetchall()
318 for message in log_messages:
319 line = log.add_line()
320 line.set_time(message['timestamp'])
321 line.set_level(message['level'])
322 line.set_log_message(message['message'])
324 @staticmethod
325 def _extract_read_filters(request):
326 """Extracts SQL filters from the LogReadRequest.
328 Args:
329 request: the incoming LogReadRequest.
330 Returns:
331 a pair of (filters, values); filters is a list of SQL filter expressions,
332 to be joined by AND operators; values is a list of values to be
333 interpolated into the filter expressions by the db library.
335 filters = []
336 values = []
338 module_filters = []
339 module_values = []
340 for module_version in request.module_version_list():
341 module_filters.append('(version_id = ? AND module = ?)')
342 module_values.append(module_version.version_id())
343 module = appinfo.DEFAULT_MODULE
344 if module_version.has_module_id():
345 module = module_version.module_id()
346 module_values.append(module)
347 filters.append('(' + ' or '.join(module_filters) + ')')
348 values += module_values
350 if request.has_offset():
351 filters.append('RequestLogs.id < ?')
352 values.append(int(request.offset().request_id()))
353 if request.has_minimum_log_level():
354 filters.append('AppLogs.level >= ?')
355 values.append(request.minimum_log_level())
362 finished_filter = 'finished = 1 '
363 finished_filter_values = []
364 unfinished_filter = 'finished = 0'
365 unfinished_filter_values = []
367 if request.has_start_time():
368 finished_filter += ' and end_time >= ? '
369 finished_filter_values.append(request.start_time())
370 unfinished_filter += ' and start_time >= ? '
371 unfinished_filter_values.append(request.start_time())
372 if request.has_end_time():
373 finished_filter += ' and end_time < ? '
374 finished_filter_values.append(request.end_time())
375 unfinished_filter += ' and start_time < ? '
376 unfinished_filter_values.append(request.end_time())
378 if request.include_incomplete():
379 filters.append(
380 '((' + finished_filter + ') or (' + unfinished_filter + '))')
381 values += finished_filter_values + unfinished_filter_values
382 else:
383 filters.append(finished_filter)
384 values += finished_filter_values
386 return filters, values
388 def _Dynamic_SetStatus(self, unused_request, unused_response,
389 unused_request_id):
390 raise NotImplementedError
392 def _Dynamic_Usage(self, unused_request, unused_response, unused_request_id):
393 raise apiproxy_errors.CapabilityDisabledError('Usage not allowed in tests.')