3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
26 from google
.appengine
.api
import apiproxy_stub
27 from google
.appengine
.api
import appinfo
28 from google
.appengine
.api
.logservice
import log_service_pb
29 from google
.appengine
.runtime
import apiproxy_errors
32 _REQUEST_LOG_CREATE
= """
33 CREATE TABLE IF NOT EXISTS RequestLogs (
34 id INTEGER NOT NULL PRIMARY KEY,
35 user_request_id TEXT NOT NULL,
37 version_id TEXT NOT NULL,
40 nickname TEXT NOT NULL,
41 start_time INTEGER NOT NULL,
42 end_time INTEGER DEFAULT 0 NOT NULL,
44 resource TEXT NOT NULL,
45 http_version TEXT NOT NULL,
46 status INTEGER DEFAULT 0 NOT NULL,
47 response_size INTEGER DEFAULT 0 NOT NULL,
48 user_agent TEXT NOT NULL,
49 url_map_entry TEXT DEFAULT '' NOT NULL,
51 task_queue_name TEXT DEFAULT '' NOT NULL,
52 task_name TEXT DEFAULT '' NOT NULL,
53 latency INTEGER DEFAULT 0 NOT NULL,
54 mcycles INTEGER DEFAULT 0 NOT NULL,
55 finished INTEGER DEFAULT 0 NOT NULL
59 _REQUEST_LOG_ADD_MODULE_COLUMN
= """
60 ALTER TABLE RequestLogs
61 ADD COLUMN module TEXT DEFAULT '%s' NOT NULL;
62 """ % appinfo
.DEFAULT_MODULE
65 CREATE TABLE IF NOT EXISTS AppLogs (
66 id INTEGER NOT NULL PRIMARY KEY,
67 request_id INTEGER NOT NULL,
68 timestamp INTEGER NOT NULL,
69 level INTEGER NOT NULL,
70 message TEXT NOT NULL,
71 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
76 class LogServiceStub(apiproxy_stub
.APIProxyStub
):
77 """Python stub for Log Service service."""
81 _ACCEPTS_REQUEST_ID
= True
84 _DEFAULT_READ_COUNT
= 20
87 _MIN_COMMIT_INTERVAL
= 5
89 def __init__(self
, persist
=False, logs_path
=None, request_data
=None):
93 persist: For backwards compatability. Has no effect.
94 logs_path: A str containing the filename to use for logs storage. Defaults
95 to in-memory if unset.
96 request_data: A apiproxy_stub.RequestData instance used to look up state
97 associated with the request that generated an API call.
100 super(LogServiceStub
, self
).__init
__('logservice',
101 request_data
=request_data
)
102 self
._request
_id
_to
_request
_row
_id
= {}
103 if logs_path
is None:
104 logs_path
= ':memory:'
105 self
._conn
= sqlite3
.connect(logs_path
, check_same_thread
=False)
106 self
._conn
.row_factory
= sqlite3
.Row
107 self
._conn
.execute(_REQUEST_LOG_CREATE
)
108 self
._conn
.execute(_APP_LOG_CREATE
)
110 column_names
= set(c
['name'] for c
in
111 self
._conn
.execute('PRAGMA table_info(RequestLogs)'))
112 if 'module' not in column_names
:
113 self
._conn
.execute(_REQUEST_LOG_ADD_MODULE_COLUMN
)
115 self
._last
_commit
= time
.time()
116 atexit
.register(self
._conn
.commit
)
119 def _get_time_usec():
120 return int(time
.time() * 1e6
)
122 def _maybe_commit(self
):
124 if (now
- self
._last
_commit
) > self
._MIN
_COMMIT
_INTERVAL
:
126 self
._last
_commit
= now
128 @apiproxy_stub.Synchronized
129 def start_request(self
, request_id
, user_request_id
, ip
, app_id
, version_id
,
130 nickname
, user_agent
, host
, method
, resource
, http_version
,
131 start_time
=None, module
=None):
132 """Starts logging for a request.
134 Each start_request call must be followed by a corresponding end_request call
135 to cleanup resources allocated in start_request.
138 request_id: A unique string identifying the request associated with the
140 user_request_id: A user-visible unique string for retrieving the request
142 ip: The user's IP address.
143 app_id: A string representing the application ID that this request
145 version_id: A string representing the version ID that this request
147 nickname: A string representing the user that has made this request (that
148 is, the user's nickname, e.g., 'foobar' for a user logged in as
150 user_agent: A string representing the agent used to make this request.
151 host: A string representing the host that received this request.
152 method: A string containing the HTTP method of this request.
153 resource: A string containing the path and query string of this request.
154 http_version: A string containing the HTTP version of this request.
155 start_time: An int containing the start time in micro-seconds. If unset,
156 the current time is used.
157 module: The string name of the module handling this request.
160 module
= appinfo
.DEFAULT_MODULE
161 major_version_id
= version_id
.split('.', 1)[0]
162 if start_time
is None:
163 start_time
= self
._get
_time
_usec
()
164 cursor
= self
._conn
.execute(
165 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
166 'nickname, user_agent, host, start_time, method, resource, '
167 'http_version, module)'
168 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
169 user_request_id
, ip
, app_id
, major_version_id
, nickname
, user_agent
,
170 host
, start_time
, method
, resource
, http_version
, module
))
171 self
._request
_id
_to
_request
_row
_id
[request_id
] = cursor
.lastrowid
174 @apiproxy_stub.Synchronized
175 def end_request(self
, request_id
, status
, response_size
, end_time
=None):
176 """Ends logging for a request.
179 request_id: A unique string identifying the request associated with the
181 status: An int containing the HTTP status code for this request.
182 response_size: An int containing the content length of the response.
183 end_time: An int containing the end time in micro-seconds. If unset, the
184 current time is used.
186 row_id
= self
._request
_id
_to
_request
_row
_id
.pop(request_id
, None)
190 end_time
= self
._get
_time
_usec
()
192 'UPDATE RequestLogs SET '
193 'status = ?, response_size = ?, end_time = ?, finished = 1 '
195 status
, response_size
, end_time
, row_id
))
198 def _Dynamic_Flush(self
, request
, unused_response
, request_id
):
199 """Writes application-level log messages for a request."""
200 group
= log_service_pb
.UserAppLogGroup(request
.logs())
201 self
._insert
_app
_logs
(request_id
, group
.log_line_list())
203 @apiproxy_stub.Synchronized
204 def _insert_app_logs(self
, request_id
, log_lines
):
205 row_id
= self
._request
_id
_to
_request
_row
_id
.get(request_id
)
208 new_app_logs
= (self
._tuple
_from
_log
_line
(row_id
, log_line
)
209 for log_line
in log_lines
)
210 self
._conn
.executemany(
211 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
212 '(?, ?, ?, ?)', new_app_logs
)
216 def _tuple_from_log_line(row_id
, log_line
):
217 message
= log_line
.message()
218 if isinstance(message
, str):
219 message
= codecs
.decode(message
, 'utf-8', 'replace')
220 return (row_id
, log_line
.timestamp_usec(), log_line
.level(), message
)
222 @apiproxy_stub.Synchronized
223 def _Dynamic_Read(self
, request
, response
, request_id
):
224 if (request
.module_version_size() < 1 and
225 request
.version_id_size() < 1 and
226 request
.request_id_size() < 1):
227 raise apiproxy_errors
.ApplicationError(
228 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
230 if request
.module_version_size() > 0 and request
.version_id_size() > 0:
231 raise apiproxy_errors
.ApplicationError(
232 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
234 if (request
.request_id_size() and
235 (request
.has_start_time() or request
.has_end_time() or
236 request
.has_offset())):
237 raise apiproxy_errors
.ApplicationError(
238 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
240 if request
.request_id_size():
241 for request_id
in request
.request_id_list():
242 log_row
= self
._conn
.execute(
243 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
244 (request_id
,)).fetchone()
246 log
= response
.add_log()
247 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
250 if request
.has_count():
251 count
= request
.count()
253 count
= self
._DEFAULT
_READ
_COUNT
254 filters
, values
= self
._extract
_read
_filters
(request
)
255 filter_string
= ' WHERE %s' % ' and '.join(filters
)
257 if request
.has_minimum_log_level():
258 query
= ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
259 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
260 'RequestLogs.id ORDER BY id DESC')
262 query
= 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
263 logs
= self
._conn
.execute(query
% filter_string
,
264 values
).fetchmany(count
+ 1)
265 if logging
.getLogger(__name__
).isEnabledFor(logging
.DEBUG
):
266 self
._debug
_query
(filter_string
, values
, len(logs
))
267 for log_row
in logs
[:count
]:
268 log
= response
.add_log()
269 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
270 if len(logs
) > count
:
271 response
.mutable_offset().set_request_id(str(logs
[-2]['id']))
273 def _debug_query(self
, filter_string
, values
, result_count
):
274 logging
.debug('\n\n')
275 logging
.debug(filter_string
)
276 logging
.debug(values
)
277 logging
.debug('%d results.', result_count
)
278 logging
.debug('DB dump:')
279 for l
in self
._conn
.execute('SELECT * FROM RequestLogs'):
280 logging
.debug('%r %r %d %d %s', l
['module'], l
['version_id'],
281 l
['start_time'], l
['end_time'],
282 l
['finished'] and 'COMPLETE' or 'INCOMPLETE')
284 def _fill_request_log(self
, log_row
, log
, include_app_logs
):
285 log
.set_request_id(str(log_row
['user_request_id']))
286 log
.set_app_id(log_row
['app_id'])
287 log
.set_version_id(log_row
['version_id'])
288 log
.set_module_id(log_row
['module'])
289 log
.set_ip(log_row
['ip'])
290 log
.set_nickname(log_row
['nickname'])
291 log
.set_start_time(log_row
['start_time'])
292 log
.set_host(log_row
['host'])
293 log
.set_end_time(log_row
['end_time'])
294 log
.set_method(log_row
['method'])
295 log
.set_resource(log_row
['resource'])
296 log
.set_status(log_row
['status'])
297 log
.set_response_size(log_row
['response_size'])
298 log
.set_http_version(log_row
['http_version'])
299 log
.set_user_agent(log_row
['user_agent'])
300 log
.set_url_map_entry(log_row
['url_map_entry'])
301 log
.set_latency(log_row
['latency'])
302 log
.set_mcycles(log_row
['mcycles'])
303 log
.set_finished(log_row
['finished'])
304 log
.mutable_offset().set_request_id(str(log_row
['id']))
305 time_seconds
= (log_row
['end_time'] or log_row
['start_time']) / 10**6
306 date_string
= time
.strftime('%d/%b/%Y:%H:%M:%S %z',
307 time
.localtime(time_seconds
))
308 log
.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
309 (log_row
['ip'], log_row
['nickname'], date_string
,
310 log_row
['method'], log_row
['resource'],
311 log_row
['http_version'], log_row
['status'] or 0,
312 log_row
['response_size'] or 0, log_row
['user_agent']))
314 log_messages
= self
._conn
.execute(
315 'SELECT timestamp, level, message FROM AppLogs '
316 'WHERE request_id = ?',
317 (log_row
['id'],)).fetchall()
318 for message
in log_messages
:
319 line
= log
.add_line()
320 line
.set_time(message
['timestamp'])
321 line
.set_level(message
['level'])
322 line
.set_log_message(message
['message'])
325 def _extract_read_filters(request
):
326 """Extracts SQL filters from the LogReadRequest.
329 request: the incoming LogReadRequest.
331 a pair of (filters, values); filters is a list of SQL filter expressions,
332 to be joined by AND operators; values is a list of values to be
333 interpolated into the filter expressions by the db library.
340 for module_version
in request
.module_version_list():
341 module_filters
.append('(version_id = ? AND module = ?)')
342 module_values
.append(module_version
.version_id())
343 module
= appinfo
.DEFAULT_MODULE
344 if module_version
.has_module_id():
345 module
= module_version
.module_id()
346 module_values
.append(module
)
347 filters
.append('(' + ' or '.join(module_filters
) + ')')
348 values
+= module_values
350 if request
.has_offset():
351 filters
.append('RequestLogs.id < ?')
352 values
.append(int(request
.offset().request_id()))
353 if request
.has_minimum_log_level():
354 filters
.append('AppLogs.level >= ?')
355 values
.append(request
.minimum_log_level())
362 finished_filter
= 'finished = 1 '
363 finished_filter_values
= []
364 unfinished_filter
= 'finished = 0'
365 unfinished_filter_values
= []
367 if request
.has_start_time():
368 finished_filter
+= ' and end_time >= ? '
369 finished_filter_values
.append(request
.start_time())
370 unfinished_filter
+= ' and start_time >= ? '
371 unfinished_filter_values
.append(request
.start_time())
372 if request
.has_end_time():
373 finished_filter
+= ' and end_time < ? '
374 finished_filter_values
.append(request
.end_time())
375 unfinished_filter
+= ' and start_time < ? '
376 unfinished_filter_values
.append(request
.end_time())
378 if request
.include_incomplete():
380 '((' + finished_filter
+ ') or (' + unfinished_filter
+ '))')
381 values
+= finished_filter_values
+ unfinished_filter_values
383 filters
.append(finished_filter
)
384 values
+= finished_filter_values
386 return filters
, values
388 def _Dynamic_SetStatus(self
, unused_request
, unused_response
,
390 raise NotImplementedError
392 def _Dynamic_Usage(self
, unused_request
, unused_response
, unused_request_id
):
393 raise apiproxy_errors
.CapabilityDisabledError('Usage not allowed in tests.')