3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
26 from google
.appengine
.api
import apiproxy_stub
27 from google
.appengine
.api
import appinfo
28 from google
.appengine
.api
.logservice
import log_service_pb
29 from google
.appengine
.runtime
import apiproxy_errors
32 _REQUEST_LOG_CREATE
= """
33 CREATE TABLE IF NOT EXISTS RequestLogs (
34 id INTEGER NOT NULL PRIMARY KEY,
35 user_request_id TEXT NOT NULL,
37 version_id TEXT NOT NULL,
40 nickname TEXT NOT NULL,
41 start_time INTEGER NOT NULL,
42 end_time INTEGER DEFAULT 0 NOT NULL,
44 resource TEXT NOT NULL,
45 http_version TEXT NOT NULL,
46 status INTEGER DEFAULT 0 NOT NULL,
47 response_size INTEGER DEFAULT 0 NOT NULL,
48 user_agent TEXT NOT NULL,
49 url_map_entry TEXT DEFAULT '' NOT NULL,
51 task_queue_name TEXT DEFAULT '' NOT NULL,
52 task_name TEXT DEFAULT '' NOT NULL,
53 latency INTEGER DEFAULT 0 NOT NULL,
54 mcycles INTEGER DEFAULT 0 NOT NULL,
55 finished INTEGER DEFAULT 0 NOT NULL
59 _REQUEST_LOG_ADD_MODULE_COLUMN
= """
60 ALTER TABLE RequestLogs
61 ADD COLUMN module TEXT DEFAULT '%s' NOT NULL;
62 """ % appinfo
.DEFAULT_MODULE
65 CREATE TABLE IF NOT EXISTS AppLogs (
66 id INTEGER NOT NULL PRIMARY KEY,
67 request_id INTEGER NOT NULL,
68 timestamp INTEGER NOT NULL,
69 level INTEGER NOT NULL,
70 message TEXT NOT NULL,
71 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
76 class LogServiceStub(apiproxy_stub
.APIProxyStub
):
77 """Python stub for Log Service service."""
81 _ACCEPTS_REQUEST_ID
= True
84 _DEFAULT_READ_COUNT
= 20
87 _MIN_COMMIT_INTERVAL
= 5
89 def __init__(self
, persist
=False, logs_path
=None, request_data
=None):
93 persist: For backwards compatability. Has no effect.
94 logs_path: A str containing the filename to use for logs storage. Defaults
95 to in-memory if unset.
96 request_data: A apiproxy_stub.RequestData instance used to look up state
97 associated with the request that generated an API call.
100 super(LogServiceStub
, self
).__init
__('logservice',
101 request_data
=request_data
)
102 self
._request
_id
_to
_request
_row
_id
= {}
103 if logs_path
is None:
104 logs_path
= ':memory:'
105 self
._conn
= sqlite3
.connect(logs_path
, check_same_thread
=False)
106 self
._conn
.row_factory
= sqlite3
.Row
107 self
._conn
.execute(_REQUEST_LOG_CREATE
)
108 self
._conn
.execute(_APP_LOG_CREATE
)
110 column_names
= set(c
['name'] for c
in
111 self
._conn
.execute('PRAGMA table_info(RequestLogs)'))
112 if 'module' not in column_names
:
113 self
._conn
.execute(_REQUEST_LOG_ADD_MODULE_COLUMN
)
115 self
._last
_commit
= time
.time()
116 atexit
.register(self
._conn
.commit
)
119 def _get_time_usec():
120 return int(time
.time() * 1e6
)
122 def _maybe_commit(self
):
124 if (now
- self
._last
_commit
) > self
._MIN
_COMMIT
_INTERVAL
:
126 self
._last
_commit
= now
128 @apiproxy_stub.Synchronized
129 def start_request(self
, request_id
, user_request_id
, ip
, app_id
, version_id
,
130 nickname
, user_agent
, host
, method
, resource
, http_version
,
131 start_time
=None, module
=None):
132 """Starts logging for a request.
134 Each start_request call must be followed by a corresponding end_request call
135 to cleanup resources allocated in start_request.
138 request_id: A unique string identifying the request associated with the
140 user_request_id: A user-visible unique string for retrieving the request
142 ip: The user's IP address.
143 app_id: A string representing the application ID that this request
145 version_id: A string representing the version ID that this request
147 nickname: A string representing the user that has made this request (that
148 is, the user's nickname, e.g., 'foobar' for a user logged in as
150 user_agent: A string representing the agent used to make this request.
151 host: A string representing the host that received this request.
152 method: A string containing the HTTP method of this request.
153 resource: A string containing the path and query string of this request.
154 http_version: A string containing the HTTP version of this request.
155 start_time: An int containing the start time in micro-seconds. If unset,
156 the current time is used.
157 module: The string name of the module handling this request.
160 module
= appinfo
.DEFAULT_MODULE
161 if version_id
is None:
162 version_id
= 'NO-VERSION'
163 major_version_id
= version_id
.split('.', 1)[0]
164 if start_time
is None:
165 start_time
= self
._get
_time
_usec
()
166 cursor
= self
._conn
.execute(
167 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
168 'nickname, user_agent, host, start_time, method, resource, '
169 'http_version, module)'
170 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
171 user_request_id
, ip
, app_id
, major_version_id
, nickname
, user_agent
,
172 host
, start_time
, method
, resource
, http_version
, module
))
173 self
._request
_id
_to
_request
_row
_id
[request_id
] = cursor
.lastrowid
176 @apiproxy_stub.Synchronized
177 def end_request(self
, request_id
, status
, response_size
, end_time
=None):
178 """Ends logging for a request.
181 request_id: A unique string identifying the request associated with the
183 status: An int containing the HTTP status code for this request.
184 response_size: An int containing the content length of the response.
185 end_time: An int containing the end time in micro-seconds. If unset, the
186 current time is used.
188 row_id
= self
._request
_id
_to
_request
_row
_id
.pop(request_id
, None)
192 end_time
= self
._get
_time
_usec
()
194 'UPDATE RequestLogs SET '
195 'status = ?, response_size = ?, end_time = ?, finished = 1 '
197 status
, response_size
, end_time
, row_id
))
200 def _Dynamic_Flush(self
, request
, unused_response
, request_id
):
201 """Writes application-level log messages for a request."""
202 group
= log_service_pb
.UserAppLogGroup(request
.logs())
203 self
._insert
_app
_logs
(request_id
, group
.log_line_list())
205 @apiproxy_stub.Synchronized
206 def _insert_app_logs(self
, request_id
, log_lines
):
207 row_id
= self
._request
_id
_to
_request
_row
_id
.get(request_id
)
210 new_app_logs
= (self
._tuple
_from
_log
_line
(row_id
, log_line
)
211 for log_line
in log_lines
)
212 self
._conn
.executemany(
213 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
214 '(?, ?, ?, ?)', new_app_logs
)
218 def _tuple_from_log_line(row_id
, log_line
):
219 message
= log_line
.message()
220 if isinstance(message
, str):
221 message
= codecs
.decode(message
, 'utf-8', 'replace')
222 return (row_id
, log_line
.timestamp_usec(), log_line
.level(), message
)
224 @apiproxy_stub.Synchronized
225 def _Dynamic_Read(self
, request
, response
, request_id
):
226 if (request
.module_version_size() < 1 and
227 request
.version_id_size() < 1 and
228 request
.request_id_size() < 1):
229 raise apiproxy_errors
.ApplicationError(
230 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
232 if request
.module_version_size() > 0 and request
.version_id_size() > 0:
233 raise apiproxy_errors
.ApplicationError(
234 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
236 if (request
.request_id_size() and
237 (request
.has_start_time() or request
.has_end_time() or
238 request
.has_offset())):
239 raise apiproxy_errors
.ApplicationError(
240 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
242 if request
.request_id_size():
243 for request_id
in request
.request_id_list():
244 log_row
= self
._conn
.execute(
245 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
246 (request_id
,)).fetchone()
248 log
= response
.add_log()
249 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
252 if request
.has_count():
253 count
= request
.count()
255 count
= self
._DEFAULT
_READ
_COUNT
256 filters
, values
= self
._extract
_read
_filters
(request
)
257 filter_string
= ' WHERE %s' % ' and '.join(filters
)
259 if request
.has_minimum_log_level():
260 query
= ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
261 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
262 'RequestLogs.id ORDER BY id DESC')
264 query
= 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
265 logs
= self
._conn
.execute(query
% filter_string
,
266 values
).fetchmany(count
+ 1)
267 if logging
.getLogger(__name__
).isEnabledFor(logging
.DEBUG
):
268 self
._debug
_query
(filter_string
, values
, len(logs
))
269 for log_row
in logs
[:count
]:
270 log
= response
.add_log()
271 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
272 if len(logs
) > count
:
273 response
.mutable_offset().set_request_id(str(logs
[-2]['id']))
275 def _debug_query(self
, filter_string
, values
, result_count
):
276 logging
.debug('\n\n')
277 logging
.debug(filter_string
)
278 logging
.debug(values
)
279 logging
.debug('%d results.', result_count
)
280 logging
.debug('DB dump:')
281 for l
in self
._conn
.execute('SELECT * FROM RequestLogs'):
282 logging
.debug('%r %r %d %d %s', l
['module'], l
['version_id'],
283 l
['start_time'], l
['end_time'],
284 l
['finished'] and 'COMPLETE' or 'INCOMPLETE')
286 def _fill_request_log(self
, log_row
, log
, include_app_logs
):
287 log
.set_request_id(str(log_row
['user_request_id']))
288 log
.set_app_id(log_row
['app_id'])
289 log
.set_version_id(log_row
['version_id'])
290 log
.set_module_id(log_row
['module'])
291 log
.set_ip(log_row
['ip'])
292 log
.set_nickname(log_row
['nickname'])
293 log
.set_start_time(log_row
['start_time'])
294 log
.set_host(log_row
['host'])
295 log
.set_end_time(log_row
['end_time'])
296 log
.set_method(log_row
['method'])
297 log
.set_resource(log_row
['resource'])
298 log
.set_status(log_row
['status'])
299 log
.set_response_size(log_row
['response_size'])
300 log
.set_http_version(log_row
['http_version'])
301 log
.set_user_agent(log_row
['user_agent'])
302 log
.set_url_map_entry(log_row
['url_map_entry'])
303 log
.set_latency(log_row
['latency'])
304 log
.set_mcycles(log_row
['mcycles'])
305 log
.set_finished(log_row
['finished'])
306 log
.mutable_offset().set_request_id(str(log_row
['id']))
307 time_seconds
= (log_row
['end_time'] or log_row
['start_time']) / 10**6
308 date_string
= time
.strftime('%d/%b/%Y:%H:%M:%S %z',
309 time
.localtime(time_seconds
))
310 log
.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
311 (log_row
['ip'], log_row
['nickname'], date_string
,
312 log_row
['method'], log_row
['resource'],
313 log_row
['http_version'], log_row
['status'] or 0,
314 log_row
['response_size'] or 0, log_row
['user_agent']))
316 log_messages
= self
._conn
.execute(
317 'SELECT timestamp, level, message FROM AppLogs '
318 'WHERE request_id = ?',
319 (log_row
['id'],)).fetchall()
320 for message
in log_messages
:
321 line
= log
.add_line()
322 line
.set_time(message
['timestamp'])
323 line
.set_level(message
['level'])
324 line
.set_log_message(message
['message'])
327 def _extract_read_filters(request
):
328 """Extracts SQL filters from the LogReadRequest.
331 request: the incoming LogReadRequest.
333 a pair of (filters, values); filters is a list of SQL filter expressions,
334 to be joined by AND operators; values is a list of values to be
335 interpolated into the filter expressions by the db library.
342 for module_version
in request
.module_version_list():
343 module_filters
.append('(version_id = ? AND module = ?)')
344 module_values
.append(module_version
.version_id())
345 module
= appinfo
.DEFAULT_MODULE
346 if module_version
.has_module_id():
347 module
= module_version
.module_id()
348 module_values
.append(module
)
350 filters
.append('(' + ' or '.join(module_filters
) + ')')
351 values
+= module_values
353 if request
.has_offset():
355 filters
.append('RequestLogs.id < ?')
356 values
.append(int(request
.offset().request_id()))
358 logging
.error('Bad offset in log request: "%s"', request
.offset())
359 raise apiproxy_errors
.ApplicationError(
360 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
361 if request
.has_minimum_log_level():
362 filters
.append('AppLogs.level >= ?')
363 values
.append(request
.minimum_log_level())
370 finished_filter
= 'finished = 1 '
371 finished_filter_values
= []
372 unfinished_filter
= 'finished = 0'
373 unfinished_filter_values
= []
375 if request
.has_start_time():
376 finished_filter
+= ' and end_time >= ? '
377 finished_filter_values
.append(request
.start_time())
378 unfinished_filter
+= ' and start_time >= ? '
379 unfinished_filter_values
.append(request
.start_time())
380 if request
.has_end_time():
381 finished_filter
+= ' and end_time < ? '
382 finished_filter_values
.append(request
.end_time())
383 unfinished_filter
+= ' and start_time < ? '
384 unfinished_filter_values
.append(request
.end_time())
386 if request
.include_incomplete():
388 '((' + finished_filter
+ ') or (' + unfinished_filter
+ '))')
389 values
+= finished_filter_values
+ unfinished_filter_values
391 filters
.append(finished_filter
)
392 values
+= finished_filter_values
394 return filters
, values
396 def _Dynamic_SetStatus(self
, unused_request
, unused_response
,
398 raise NotImplementedError
400 def _Dynamic_Usage(self
, unused_request
, unused_response
, unused_request_id
):
401 raise apiproxy_errors
.CapabilityDisabledError('Usage not allowed in tests.')