App Engine Python SDK version 1.7.7
[gae.git] / python / google / appengine / api / logservice / logservice_stub.py
blob5676f576a2e83d65a07943351bdec68225e2f2d1
1 #!/usr/bin/env python
3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
19 import atexit
20 import time
22 import sqlite3
24 from google.appengine.api import apiproxy_stub
25 from google.appengine.api.logservice import log_service_pb
26 from google.appengine.runtime import apiproxy_errors
30 _REQUEST_LOG_CREATE = """
31 CREATE TABLE IF NOT EXISTS RequestLogs (
32 id INTEGER NOT NULL PRIMARY KEY,
33 user_request_id TEXT NOT NULL,
34 app_id TEXT NOT NULL,
35 version_id TEXT NOT NULL,
36 ip TEXT NOT NULL,
37 nickname TEXT NOT NULL,
38 start_time INTEGER NOT NULL,
39 end_time INTEGER DEFAULT 0 NOT NULL,
40 method TEXT NOT NULL,
41 resource TEXT NOT NULL,
42 http_version TEXT NOT NULL,
43 status INTEGER DEFAULT 0 NOT NULL,
44 response_size INTEGER DEFAULT 0 NOT NULL,
45 user_agent TEXT NOT NULL,
46 url_map_entry TEXT DEFAULT '' NOT NULL,
47 host TEXT NOT NULL,
48 task_queue_name TEXT DEFAULT '' NOT NULL,
49 task_name TEXT DEFAULT '' NOT NULL,
50 latency INTEGER DEFAULT 0 NOT NULL,
51 mcycles INTEGER DEFAULT 0 NOT NULL,
52 finished INTEGER DEFAULT 0 NOT NULL
54 """
56 _APP_LOG_CREATE = """
57 CREATE TABLE IF NOT EXISTS AppLogs (
58 id INTEGER NOT NULL PRIMARY KEY,
59 request_id INTEGER NOT NULL,
60 timestamp INTEGER NOT NULL,
61 level INTEGER NOT NULL,
62 message TEXT NOT NULL,
63 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
65 """
68 class LogServiceStub(apiproxy_stub.APIProxyStub):
69 """Python stub for Log Service service."""
71 _ACCEPTS_REQUEST_ID = True
74 _DEFAULT_READ_COUNT = 20
77 _MIN_COMMIT_INTERVAL = 5
79 def __init__(self, persist=False, logs_path=None, request_data=None):
80 """Initializer.
82 Args:
83 persist: For backwards compatability. Has no effect.
84 logs_path: A str containing the filename to use for logs storage. Defaults
85 to in-memory if unset.
86 request_data: A apiproxy_stub.RequestData instance used to look up state
87 associated with the request that generated an API call.
88 """
90 super(LogServiceStub, self).__init__('logservice',
91 request_data=request_data)
92 self._request_id_to_request_row_id = {}
93 if logs_path is None:
94 logs_path = ':memory:'
95 self._conn = sqlite3.connect(logs_path, check_same_thread=False)
96 self._conn.row_factory = sqlite3.Row
97 self._conn.execute(_REQUEST_LOG_CREATE)
98 self._conn.execute(_APP_LOG_CREATE)
99 self._last_commit = time.time()
100 atexit.register(self._conn.commit)
102 @staticmethod
103 def _get_time_usec():
104 return int(time.time() * 1e6)
106 def _maybe_commit(self):
107 now = time.time()
108 if (now - self._last_commit) > self._MIN_COMMIT_INTERVAL:
109 self._conn.commit()
110 self._last_commit = now
112 @apiproxy_stub.Synchronized
113 def start_request(self, request_id, user_request_id, ip, app_id, version_id,
114 nickname, user_agent, host, method, resource, http_version,
115 start_time=None):
116 """Starts logging for a request.
118 Each start_request call must be followed by a corresponding end_request call
119 to cleanup resources allocated in start_request.
121 Args:
122 request_id: A unique string identifying the request associated with the
123 API call.
124 user_request_id: A user-visible unique string for retrieving the request
125 log at a later time.
126 ip: The user's IP address.
127 app_id: A string representing the application ID that this request
128 corresponds to.
129 version_id: A string representing the version ID that this request
130 corresponds to.
131 nickname: A string representing the user that has made this request (that
132 is, the user's nickname, e.g., 'foobar' for a user logged in as
133 'foobar@gmail.com').
134 user_agent: A string representing the agent used to make this request.
135 host: A string representing the host that received this request.
136 method: A string containing the HTTP method of this request.
137 resource: A string containing the path and query string of this request.
138 http_version: A string containing the HTTP version of this request.
139 start_time: An int containing the start time in micro-seconds. If unset,
140 the current time is used.
142 major_version_id = version_id.split('.', 1)[0]
143 if start_time is None:
144 start_time = self._get_time_usec()
145 cursor = self._conn.execute(
146 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
147 'nickname, user_agent, host, start_time, method, resource, '
148 'http_version)'
149 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
150 user_request_id, ip, app_id, major_version_id, nickname, user_agent,
151 host, start_time, method, resource, http_version))
152 self._request_id_to_request_row_id[request_id] = cursor.lastrowid
153 self._maybe_commit()
155 @apiproxy_stub.Synchronized
156 def end_request(self, request_id, status, response_size, end_time=None):
157 """Ends logging for a request.
159 Args:
160 request_id: A unique string identifying the request associated with the
161 API call.
162 status: An int containing the HTTP status code for this request.
163 response_size: An int containing the content length of the response.
164 end_time: An int containing the end time in micro-seconds. If unset, the
165 current time is used.
167 row_id = self._request_id_to_request_row_id.pop(request_id, None)
168 if not row_id:
169 return
170 if end_time is None:
171 end_time = self._get_time_usec()
172 self._conn.execute(
173 'UPDATE RequestLogs SET '
174 'status = ?, response_size = ?, end_time = ?, finished = 1 '
175 'WHERE id = ?', (
176 status, response_size, end_time, row_id))
177 self._maybe_commit()
179 def _Dynamic_Flush(self, request, unused_response, request_id):
180 """Writes application-level log messages for a request."""
181 group = log_service_pb.UserAppLogGroup(request.logs())
182 self._insert_app_logs(request_id, group.log_line_list())
184 @apiproxy_stub.Synchronized
185 def _insert_app_logs(self, request_id, log_lines):
186 row_id = self._request_id_to_request_row_id.get(request_id)
187 if row_id is None:
188 return
189 new_app_logs = (self._tuple_from_log_line(row_id, log_line)
190 for log_line in log_lines)
191 self._conn.executemany(
192 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
193 '(?, ?, ?, ?)', new_app_logs)
194 self._maybe_commit()
196 @staticmethod
197 def _tuple_from_log_line(row_id, log_line):
198 message = log_line.message()
199 if isinstance(message, str):
200 message = message.decode('utf-8')
201 return (row_id, log_line.timestamp_usec(), log_line.level(), message)
203 @apiproxy_stub.Synchronized
204 def _Dynamic_Read(self, request, response, request_id):
205 if (request.server_version_size() < 1 and
206 request.version_id_size() < 1 and
207 request.request_id_size() < 1):
208 raise apiproxy_errors.ApplicationError(
209 log_service_pb.LogServiceError.INVALID_REQUEST)
211 if request.server_version_size() > 0 and request.version_id_size() > 0:
212 raise apiproxy_errors.ApplicationError(
213 log_service_pb.LogServiceError.INVALID_REQUEST)
215 if (request.request_id_size() and
216 (request.has_start_time() or request.has_end_time() or
217 request.has_offset())):
218 raise apiproxy_errors.ApplicationError(
219 log_service_pb.LogServiceError.INVALID_REQUEST)
221 if request.request_id_size():
222 for request_id in request.request_id_list():
223 log_row = self._conn.execute(
224 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
225 (request_id,)).fetchone()
226 if log_row:
227 log = response.add_log()
228 self._fill_request_log(log_row, log, request.include_app_logs())
229 return
231 if request.has_count():
232 count = request.count()
233 else:
234 count = self._DEFAULT_READ_COUNT
235 filters = self._extract_read_filters(request)
236 filter_string = ' WHERE %s' % ' and '.join(f[0] for f in filters)
238 if request.has_minimum_log_level():
239 query = ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
240 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
241 'RequestLogs.id ORDER BY id DESC')
242 else:
243 query = 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
244 logs = self._conn.execute(query % filter_string,
245 tuple(f[1] for f in filters)).fetchmany(count + 1)
246 for log_row in logs[:count]:
247 log = response.add_log()
248 self._fill_request_log(log_row, log, request.include_app_logs())
249 if len(logs) > count:
250 response.mutable_offset().set_request_id(str(logs[-2]['id']))
252 def _fill_request_log(self, log_row, log, include_app_logs):
253 log.set_request_id(str(log_row['user_request_id']))
254 log.set_app_id(log_row['app_id'])
255 log.set_version_id(log_row['version_id'])
256 log.set_ip(log_row['ip'])
257 log.set_nickname(log_row['nickname'])
258 log.set_start_time(log_row['start_time'])
259 log.set_host(log_row['host'])
260 log.set_end_time(log_row['end_time'])
261 log.set_method(log_row['method'])
262 log.set_resource(log_row['resource'])
263 log.set_status(log_row['status'])
264 log.set_response_size(log_row['response_size'])
265 log.set_http_version(log_row['http_version'])
266 log.set_user_agent(log_row['user_agent'])
267 log.set_url_map_entry(log_row['url_map_entry'])
268 log.set_latency(log_row['latency'])
269 log.set_mcycles(log_row['mcycles'])
270 log.set_finished(log_row['finished'])
271 log.mutable_offset().set_request_id(str(log_row['id']))
272 time_seconds = (log_row['end_time'] or log_row['start_time']) / 10**6
273 date_string = time.strftime('%d/%b/%Y:%H:%M:%S %z',
274 time.localtime(time_seconds))
275 log.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
276 (log_row['ip'], log_row['nickname'], date_string,
277 log_row['method'], log_row['resource'],
278 log_row['http_version'], log_row['status'] or 0,
279 log_row['response_size'] or 0, log_row['user_agent']))
280 if include_app_logs:
281 log_messages = self._conn.execute(
282 'SELECT timestamp, level, message FROM AppLogs '
283 'WHERE request_id = ?',
284 (log_row['id'],)).fetchall()
285 for message in log_messages:
286 line = log.add_line()
287 line.set_time(message['timestamp'])
288 line.set_level(message['level'])
289 line.set_log_message(message['message'])
291 @staticmethod
292 def _extract_read_filters(request):
295 if request.server_version(0).has_server_id():
296 server_version = ':'.join([request.server_version(0).server_id(),
297 request.server_version(0).version_id()])
298 else:
299 server_version = request.server_version(0).version_id()
300 filters = [('version_id = ?', server_version)]
301 if request.has_start_time():
302 filters.append(('start_time >= ?', request.start_time()))
303 if request.has_end_time():
304 filters.append(('end_time < ?', request.end_time()))
305 if request.has_offset():
306 filters.append(('RequestLogs.id < ?', int(request.offset().request_id())))
307 if not request.include_incomplete():
308 filters.append(('finished = ?', 1))
309 if request.has_minimum_log_level():
310 filters.append(('AppLogs.level >= ?', request.minimum_log_level()))
311 return filters
313 def _Dynamic_SetStatus(self, unused_request, unused_response,
314 unused_request_id):
315 raise NotImplementedError
317 def _Dynamic_Usage(self, unused_request, unused_response, unused_request_id):
318 raise apiproxy_errors.CapabilityDisabledError('Usage not allowed in tests.')