Forward Port: Force validation of initial slash in request URI
[MonkeyD.git] / qa / checklog
blobf1df314aac06cee3e9de3f71aae7cbd18824d861
1 #!/usr/bin/env python
3 # Copyright (C) 2010, Eduardo Silva <edsiper@gmail.com>
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 import os
20 import sys
21 import re
22 import getopt
23 import ConfigParser
25 RULES_PATH = 'log_rules'
26 ACCESS_FILE = '../logs/access.log'
27 ERROR_FILE = '../logs/error.log'
29 class AccessRule:
30 def __init__(self,
31 ip = None, time = None,
32 method = None, uri = None,
33 protocol = None, status = None,
34 size = None):
36 self.type = 'access'
37 self.ip = ip
38 self.time = time
39 self.method = method
40 self.uri = uri
41 self.status = status
42 self.protocol = protocol
43 self.status = status
45 if size is not None:
46 if size.isdigit():
47 self.size = size
48 else:
49 if size[:8] == 'FILESIZE':
50 target = size[8:].strip()
51 self.size = os.path.getsize(target)
52 else:
53 self.size = size
54 else:
55 self.size = size
57 class ErrorRule:
58 def __init__(self,
59 ip = None, time = None,
60 error = None, message = None):
62 self.type = 'error'
63 self.ip = ip
64 self.time = time
65 self.error = error
66 self.message = message
68 class Config(ConfigParser.ConfigParser):
69 def __init__(self):
70 ConfigParser.ConfigParser.__init__(self)
72 def _get_value(self, section, key):
73 try:
74 value = self.get(section, key)
75 except:
76 value = None
78 return value
80 def get_rules(self, path):
81 self.read(path)
83 rules = []
84 for section in self.sections():
85 if section == 'access':
86 ip = self._get_value(section, 'ip')
87 time = self._get_value(section, 'time')
88 method = self._get_value(section, 'method')
89 uri = self._get_value(section, 'uri')
90 protocol = self._get_value(section, 'protocol')
91 status = self._get_value(section, 'status')
92 size = self._get_value(section, 'size')
94 rule = AccessRule(ip, time, method, uri, protocol, status, size)
96 elif section == 'error':
97 ip = self._get_value(section, 'ip')
98 time = self._get_value(section, 'time')
99 error = self._get_value(section, 'error')
100 message = self._get_value(section, 'message')
102 rule = ErrorRule(ip, time, error, message)
104 # Add rule to list
105 rules.append(rule)
107 return rules
109 class Logfile:
111 def __init__(self):
112 self.silent_mode = False
113 self.target_logfile = None
115 self.check_arguments()
117 # Check if file exists
118 if os.path.isfile(self.target_logfile) is False:
119 # No rules exists for this test
120 if self.silent_mode is False:
121 print "No rules for target"
122 exit(2)
124 # Read rules
125 config = Config()
126 rules = config.get_rules(self.target_logfile)
128 if len(rules) == 0:
129 if self.silent_mode is False:
130 print "Error, no rules found on target file"
131 exit(2)
133 # Check rules
134 self.check_rules(rules)
136 def check_arguments(self):
137 optlist, args = getopt.getopt(sys.argv[1:], 'shl:')
138 for key, val in optlist:
139 if key == '-s':
140 self.silent_mode = True
141 elif key == '-l':
142 self.target_logfile = val
143 elif key == '-h':
144 self.help()
146 if self.target_logfile is None:
147 self.help()
149 def help(self):
150 print "** Monkey QA Checklog **"
151 print "Usage: ./checklog [-s] [-l logfile_rules]"
152 print "\nAvailable options"
153 print " -s Run checklog in silent mode, no messages to stdout"
154 print " -l logfile Specify the logfile rule"
155 print " -h Show this help"
156 print
157 exit(1)
159 def get_last_file_line(self, file):
160 f = open(file, 'r')
161 lines = f.readlines()
162 f.close()
164 if len(lines) < 1:
166 return None
168 # get last file line
169 last = lines[len(lines) - 1]
170 return last
172 def check_field(self, rule, log):
173 if rule is not None:
174 if str(rule) != str(log):
175 if self.silent_mode is False:
176 print "Rule does not match, expect '" + str(rule) + '\' got \'' + log + '\''
177 exit(1)
178 else:
179 return 0
180 else:
181 return 0
183 def check_rules(self, rules):
184 # Parse access log format, anyone is invited to fix this nasty regex
185 access_re = re.compile("^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})( - )(\[.*\])( .* ){1,4}(/.* )(.*/.* )(\d.* )(.*)\n$")
186 error_re = re.compile("^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})( - )(\[.*\])( \[.*\])(.*)$")
188 for r in rules:
189 if r.type == 'access':
190 line = self.get_last_file_line(ACCESS_FILE)
191 fields = access_re.split(line)
193 data = {'ip': fields[1],
194 'time': fields[3],
195 'method': fields[4].strip(),
196 'uri': fields[5].strip(),
197 'protocol': fields[6].strip(),
198 'status': fields[7].strip(),
199 'size': fields[8]
202 self.check_field(r.ip, data['ip'])
203 self.check_field(r.time, data['time'])
204 self.check_field(r.method, data['method'])
205 self.check_field(r.uri, data['uri'])
206 self.check_field(r.protocol, data['protocol'])
207 self.check_field(r.status, data['status'])
208 self.check_field(r.size, data['size'])
210 elif r.type == 'error':
211 line = self.get_last_file_line(ERROR_FILE)
212 fields = error_re.split(line)
214 # We always expect at least 4 fields
215 if len(fields) < 4:
216 if self.silent_mode is False:
217 print "Error: we did not find the expected fields"
218 print "Logfile line"
219 print " %s " % line
220 exit(1)
222 data = {'ip': fields[1],
223 'time': fields[3],
224 'error': fields[4].strip('[error (\d)]'),
225 'message': fields[5].strip()
228 self.check_field(r.ip, data['ip'])
229 self.check_field(r.time, data['time'])
230 self.check_field(r.error, data['error'])
231 self.check_field(r.message, data['message'])
234 if self.silent_mode is False:
235 print "Check passed :)"
237 if __name__ == '__main__':
238 Logfile()