3 # Copyright 2007 Google Inc.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 """Stub implementation for Log Service that uses sqlite."""
24 from google
.appengine
.api
import apiproxy_stub
25 from google
.appengine
.api
.logservice
import log_service_pb
26 from google
.appengine
.runtime
import apiproxy_errors
30 _REQUEST_LOG_CREATE
= """
31 CREATE TABLE IF NOT EXISTS RequestLogs (
32 id INTEGER NOT NULL PRIMARY KEY,
33 user_request_id TEXT NOT NULL,
35 version_id TEXT NOT NULL,
37 nickname TEXT NOT NULL,
38 start_time INTEGER NOT NULL,
39 end_time INTEGER DEFAULT 0 NOT NULL,
41 resource TEXT NOT NULL,
42 http_version TEXT NOT NULL,
43 status INTEGER DEFAULT 0 NOT NULL,
44 response_size INTEGER DEFAULT 0 NOT NULL,
45 user_agent TEXT NOT NULL,
46 url_map_entry TEXT DEFAULT '' NOT NULL,
48 task_queue_name TEXT DEFAULT '' NOT NULL,
49 task_name TEXT DEFAULT '' NOT NULL,
50 latency INTEGER DEFAULT 0 NOT NULL,
51 mcycles INTEGER DEFAULT 0 NOT NULL,
52 finished INTEGER DEFAULT 0 NOT NULL
57 CREATE TABLE IF NOT EXISTS AppLogs (
58 id INTEGER NOT NULL PRIMARY KEY,
59 request_id INTEGER NOT NULL,
60 timestamp INTEGER NOT NULL,
61 level INTEGER NOT NULL,
62 message TEXT NOT NULL,
63 FOREIGN KEY(request_id) REFERENCES RequestLogs(id)
68 class LogServiceStub(apiproxy_stub
.APIProxyStub
):
69 """Python stub for Log Service service."""
71 _ACCEPTS_REQUEST_ID
= True
74 _DEFAULT_READ_COUNT
= 20
77 _MIN_COMMIT_INTERVAL
= 5
79 def __init__(self
, persist
=False, logs_path
=None, request_data
=None):
83 persist: For backwards compatability. Has no effect.
84 logs_path: A str containing the filename to use for logs storage. Defaults
85 to in-memory if unset.
86 request_data: A apiproxy_stub.RequestData instance used to look up state
87 associated with the request that generated an API call.
90 super(LogServiceStub
, self
).__init
__('logservice',
91 request_data
=request_data
)
92 self
._request
_id
_to
_request
_row
_id
= {}
94 logs_path
= ':memory:'
95 self
._conn
= sqlite3
.connect(logs_path
, check_same_thread
=False)
96 self
._conn
.row_factory
= sqlite3
.Row
97 self
._conn
.execute(_REQUEST_LOG_CREATE
)
98 self
._conn
.execute(_APP_LOG_CREATE
)
99 self
._last
_commit
= time
.time()
100 atexit
.register(self
._conn
.commit
)
103 def _get_time_usec():
104 return int(time
.time() * 1e6
)
106 def _maybe_commit(self
):
108 if (now
- self
._last
_commit
) > self
._MIN
_COMMIT
_INTERVAL
:
110 self
._last
_commit
= now
112 @apiproxy_stub.Synchronized
113 def start_request(self
, request_id
, user_request_id
, ip
, app_id
, version_id
,
114 nickname
, user_agent
, host
, method
, resource
, http_version
,
116 """Starts logging for a request.
118 Each start_request call must be followed by a corresponding end_request call
119 to cleanup resources allocated in start_request.
122 request_id: A unique string identifying the request associated with the
124 user_request_id: A user-visible unique string for retrieving the request
126 ip: The user's IP address.
127 app_id: A string representing the application ID that this request
129 version_id: A string representing the version ID that this request
131 nickname: A string representing the user that has made this request (that
132 is, the user's nickname, e.g., 'foobar' for a user logged in as
134 user_agent: A string representing the agent used to make this request.
135 host: A string representing the host that received this request.
136 method: A string containing the HTTP method of this request.
137 resource: A string containing the path and query string of this request.
138 http_version: A string containing the HTTP version of this request.
139 start_time: An int containing the start time in micro-seconds. If unset,
140 the current time is used.
142 major_version_id
= version_id
.split('.', 1)[0]
143 if start_time
is None:
144 start_time
= self
._get
_time
_usec
()
145 cursor
= self
._conn
.execute(
146 'INSERT INTO RequestLogs (user_request_id, ip, app_id, version_id, '
147 'nickname, user_agent, host, start_time, method, resource, '
149 ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (
150 user_request_id
, ip
, app_id
, major_version_id
, nickname
, user_agent
,
151 host
, start_time
, method
, resource
, http_version
))
152 self
._request
_id
_to
_request
_row
_id
[request_id
] = cursor
.lastrowid
155 @apiproxy_stub.Synchronized
156 def end_request(self
, request_id
, status
, response_size
, end_time
=None):
157 """Ends logging for a request.
160 request_id: A unique string identifying the request associated with the
162 status: An int containing the HTTP status code for this request.
163 response_size: An int containing the content length of the response.
164 end_time: An int containing the end time in micro-seconds. If unset, the
165 current time is used.
167 row_id
= self
._request
_id
_to
_request
_row
_id
.pop(request_id
, None)
171 end_time
= self
._get
_time
_usec
()
173 'UPDATE RequestLogs SET '
174 'status = ?, response_size = ?, end_time = ?, finished = 1 '
176 status
, response_size
, end_time
, row_id
))
179 def _Dynamic_Flush(self
, request
, unused_response
, request_id
):
180 """Writes application-level log messages for a request."""
181 group
= log_service_pb
.UserAppLogGroup(request
.logs())
182 self
._insert
_app
_logs
(request_id
, group
.log_line_list())
184 @apiproxy_stub.Synchronized
185 def _insert_app_logs(self
, request_id
, log_lines
):
186 row_id
= self
._request
_id
_to
_request
_row
_id
.get(request_id
)
189 new_app_logs
= (self
._tuple
_from
_log
_line
(row_id
, log_line
)
190 for log_line
in log_lines
)
191 self
._conn
.executemany(
192 'INSERT INTO AppLogs (request_id, timestamp, level, message) VALUES '
193 '(?, ?, ?, ?)', new_app_logs
)
197 def _tuple_from_log_line(row_id
, log_line
):
198 message
= log_line
.message()
199 if isinstance(message
, str):
200 message
= message
.decode('utf-8')
201 return (row_id
, log_line
.timestamp_usec(), log_line
.level(), message
)
203 @apiproxy_stub.Synchronized
204 def _Dynamic_Read(self
, request
, response
, request_id
):
205 if (request
.server_version_size() < 1 and
206 request
.version_id_size() < 1 and
207 request
.request_id_size() < 1):
208 raise apiproxy_errors
.ApplicationError(
209 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
211 if request
.server_version_size() > 0 and request
.version_id_size() > 0:
212 raise apiproxy_errors
.ApplicationError(
213 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
215 if (request
.request_id_size() and
216 (request
.has_start_time() or request
.has_end_time() or
217 request
.has_offset())):
218 raise apiproxy_errors
.ApplicationError(
219 log_service_pb
.LogServiceError
.INVALID_REQUEST
)
221 if request
.request_id_size():
222 for request_id
in request
.request_id_list():
223 log_row
= self
._conn
.execute(
224 'SELECT * FROM RequestLogs WHERE user_request_id = ?',
225 (request_id
,)).fetchone()
227 log
= response
.add_log()
228 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
231 if request
.has_count():
232 count
= request
.count()
234 count
= self
._DEFAULT
_READ
_COUNT
235 filters
= self
._extract
_read
_filters
(request
)
236 filter_string
= ' WHERE %s' % ' and '.join(f
[0] for f
in filters
)
238 if request
.has_minimum_log_level():
239 query
= ('SELECT * FROM RequestLogs INNER JOIN AppLogs ON '
240 'RequestLogs.id = AppLogs.request_id%s GROUP BY '
241 'RequestLogs.id ORDER BY id DESC')
243 query
= 'SELECT * FROM RequestLogs%s ORDER BY id DESC'
244 logs
= self
._conn
.execute(query
% filter_string
,
245 tuple(f
[1] for f
in filters
)).fetchmany(count
+ 1)
246 for log_row
in logs
[:count
]:
247 log
= response
.add_log()
248 self
._fill
_request
_log
(log_row
, log
, request
.include_app_logs())
249 if len(logs
) > count
:
250 response
.mutable_offset().set_request_id(str(logs
[-2]['id']))
252 def _fill_request_log(self
, log_row
, log
, include_app_logs
):
253 log
.set_request_id(str(log_row
['user_request_id']))
254 log
.set_app_id(log_row
['app_id'])
255 log
.set_version_id(log_row
['version_id'])
256 log
.set_ip(log_row
['ip'])
257 log
.set_nickname(log_row
['nickname'])
258 log
.set_start_time(log_row
['start_time'])
259 log
.set_host(log_row
['host'])
260 log
.set_end_time(log_row
['end_time'])
261 log
.set_method(log_row
['method'])
262 log
.set_resource(log_row
['resource'])
263 log
.set_status(log_row
['status'])
264 log
.set_response_size(log_row
['response_size'])
265 log
.set_http_version(log_row
['http_version'])
266 log
.set_user_agent(log_row
['user_agent'])
267 log
.set_url_map_entry(log_row
['url_map_entry'])
268 log
.set_latency(log_row
['latency'])
269 log
.set_mcycles(log_row
['mcycles'])
270 log
.set_finished(log_row
['finished'])
271 log
.mutable_offset().set_request_id(str(log_row
['id']))
272 time_seconds
= (log_row
['end_time'] or log_row
['start_time']) / 10**6
273 date_string
= time
.strftime('%d/%b/%Y:%H:%M:%S %z',
274 time
.localtime(time_seconds
))
275 log
.set_combined('%s - %s [%s] "%s %s %s" %d %d - "%s"' %
276 (log_row
['ip'], log_row
['nickname'], date_string
,
277 log_row
['method'], log_row
['resource'],
278 log_row
['http_version'], log_row
['status'] or 0,
279 log_row
['response_size'] or 0, log_row
['user_agent']))
281 log_messages
= self
._conn
.execute(
282 'SELECT timestamp, level, message FROM AppLogs '
283 'WHERE request_id = ?',
284 (log_row
['id'],)).fetchall()
285 for message
in log_messages
:
286 line
= log
.add_line()
287 line
.set_time(message
['timestamp'])
288 line
.set_level(message
['level'])
289 line
.set_log_message(message
['message'])
292 def _extract_read_filters(request
):
295 if request
.server_version(0).has_server_id():
296 server_version
= ':'.join([request
.server_version(0).server_id(),
297 request
.server_version(0).version_id()])
299 server_version
= request
.server_version(0).version_id()
300 filters
= [('version_id = ?', server_version
)]
301 if request
.has_start_time():
302 filters
.append(('start_time >= ?', request
.start_time()))
303 if request
.has_end_time():
304 filters
.append(('end_time < ?', request
.end_time()))
305 if request
.has_offset():
306 filters
.append(('RequestLogs.id < ?', int(request
.offset().request_id())))
307 if not request
.include_incomplete():
308 filters
.append(('finished = ?', 1))
309 if request
.has_minimum_log_level():
310 filters
.append(('AppLogs.level >= ?', request
.minimum_log_level()))
313 def _Dynamic_SetStatus(self
, unused_request
, unused_response
,
315 raise NotImplementedError
317 def _Dynamic_Usage(self
, unused_request
, unused_response
, unused_request_id
):
318 raise apiproxy_errors
.CapabilityDisabledError('Usage not allowed in tests.')