Refactoring: Moved check parameters from unsorted.py to dedicated modules (CMK-1393)
[check_mk.git] / doc / helpers / guitest
blobe6b85509a92e8c67d0f08482cabc15b4b7cbcab0
1 #!/usr/bin/python
2 # encoding: utf-8
4 import os, sys, getopt, pprint, time, requests, re
6 # .--GUI Test------------------------------------------------------------.
7 # | ____ _ _ ___ _____ _ |
8 # | / ___| | | |_ _| |_ _|__ ___| |_ |
9 # | | | _| | | || | | |/ _ \/ __| __| |
10 # | | |_| | |_| || | | | __/\__ \ |_ |
11 # | \____|\___/|___| |_|\___||___/\__| |
12 # | |
13 # +----------------------------------------------------------------------+
14 # | Actual code for running the tests |
15 # '----------------------------------------------------------------------'
18 def tests_dir():
19 path = omd_root() + "/var/check_mk/guitests/"
20 if not os.path.exists(path):
21 os.makedirs(path)
22 return path
25 def recording_symlink():
26 return tests_dir() + "RECORD"
29 def run_guitests(args):
30 if not args:
31 args = names_of_all_tests()
33 sys.stdout.write(
34 "Going to run %d tests%s...\n" % (len(args), opt_repair and " in repair mode" or ""))
36 num_successfull = 0
37 for test_name in args:
38 if opt_repair:
39 set_recording_symlink(test_name)
40 succeeded = run_guitest(test_name)
41 if opt_repair:
42 stop_recording()
43 if succeeded:
44 num_successfull += 1
45 else:
46 break
48 if succeeded:
49 sys.stdout.write("%sAll %d tests succeeded.%s\n" % (tty_green, num_successfull, tty_normal))
50 sys.exit(0)
51 else:
52 sys.stdout.write("%d tests succeeded, but last one %sfailed.%s\n" % (num_successfull,
53 tty_red, tty_normal))
54 sys.exit(1)
57 def run_guitest(test_name):
58 try:
59 sys.stdout.write(" %s%s%s:\n" % (tty_yellow, test_name, tty_normal))
60 guitest = load_guitest(test_name)
62 for step_nr, step in enumerate(guitest):
63 if step_nr < range_from:
64 continue
65 elif step_nr > range_to:
66 break
67 sys.stdout.write(" #%02d - %s..." % (step_nr, step_title(step)))
68 errors = run_test_step(test_name, step_nr, step, opt_repair)
69 if not errors:
70 sys.stdout.write(tty_green + "OK\n" + tty_normal)
71 else:
72 sys.stdout.write(tty_red + "FAILED\n" + tty_normal)
73 for error in errors:
74 sys.stdout.write(" %s\n" % error)
76 sys.stdout.write(" Replay this step with: %s%s%s\n" %
77 (tty_blue, rule_step_url(test_name, step_nr), tty_normal))
78 if opt_interactive:
79 if interactive_repair(test_name, step_nr, step):
80 continue
82 if not opt_continue:
83 return False
84 sys.stdout.write("\n")
85 return True
87 except Exception, e:
88 if opt_debug:
89 raise
90 sys.stdout.write(" %sfailed%s (%s)\n" % (tty_red, tty_normal, e))
91 return False
94 def interactive_repair(test_name, step_nr, step):
95 answer = None
96 while answer not in ["y", "n"]:
97 answer = read_line("Repair this test step (y/n)?").strip()
98 if answer == "y":
99 sys.stdout.write("Rerunning this step #%02d in repair mode...\n" % step_nr)
100 set_recording_symlink(test_name)
101 errors = run_test_step(test_name, step_nr, step, True)
102 remove_recording_symlink()
103 if errors:
104 for error in errors:
105 sys.stdout.write(" %s\n" % error)
107 return not errors
108 else:
109 return False
112 def read_line(prompt, echo=True):
113 sys.stderr.write(prompt + " " + tty_bold)
114 sys.stderr.flush()
115 try:
116 if echo:
117 answer = sys.stdin.readline().rstrip()
118 else:
119 answer = getpass.getpass(prompt="")
120 except:
121 sys.stderr.write("\n")
122 answer = None
124 sys.stderr.write(tty_normal)
125 return answer
128 def load_guitest(test_name):
130 path = tests_dir() + test_name + ".mk"
131 if not os.path.exists(path):
132 raise Exception("Test not found (missing file %s)" % path)
133 guitest = eval(file(path).read())
135 global range_from, range_to
136 range_from = opt_steps_range[0]
137 if opt_steps_range[1] is None:
138 range_to = len(guitest) - 1
139 else:
140 range_to = opt_steps_range[1]
142 return guitest
145 def save_guitest(test_name, guitest):
146 file(tests_dir() + test_name + ".mk", "w").write("%s\n" % pprint.pformat(guitest))
149 def names_of_all_tests():
150 return sorted([f[:-3] for f in os.listdir(tests_dir()) if f.endswith(".mk")])
153 def run_test_step(test_name, step_nr, step, repair):
154 if "wait" in step:
155 do_wait(step["wait"])
156 return []
158 url = "http://localhost/%s/check_mk/guitest.py" % omd_site()
159 response = requests.post(
160 url, data={
161 "test": test_name,
162 "step": str(step_nr),
163 "repair": repair and "1" or "0"
165 try:
166 response_text = response.text.decode(response.encoding)
167 except:
168 response_text = response.text
170 if "<pre>\nMOD_PYTHON ERROR\n\n" in response_text:
171 end_offset = response_text.find("MODULE CACHE DETAILS")
172 if end_offset != -1:
173 response_text = response_text[:end_offset]
174 return ["MOD PYTHON ERROR: %s" % response_text]
176 if response.status_code != requests.codes.ok:
177 errors = ["HTTP Response is not %s but %s" % (requests.codes.ok, response.status_code)]
178 errors += response_text.splitlines()
179 return errors
181 error_offset = response_text.find('[[[GUITEST FAILED]]]')
182 if error_offset != -1:
183 error_info = response_text[error_offset + 21:]
184 errors = hilite_keywords(error_info).splitlines()
185 return errors
188 def do_wait(seconds):
189 sys.stdout.write(tty_blue)
190 while seconds > 0:
191 sys.stdout.write(".")
192 sys.stdout.flush()
193 time.sleep(min(seconds, 0.2))
194 seconds -= 0.2
195 return
198 def hilite_keywords(text):
199 text = text.replace(" expected ", tty_green + " expected " + tty_normal) \
200 .replace(" got ", tty_red + " got " + tty_normal)
201 text = re.sub(
202 r" expected (.*?)\[\[\[([^]]*?)\]\]\]",
203 r" expected \1" + tty_green + r"\2" + tty_normal,
204 text,
205 flags=re.DOTALL)
206 text = re.sub(
207 r" got (.*?)\[\[\[(.*?)\]\]\]",
208 r" got \1" + tty_red + r"\2" + tty_normal,
209 text,
210 flags=re.DOTALL)
211 return text
214 def list_guitests(test_names):
215 if not test_names:
216 test_names = names_of_all_tests()
217 for test_name in test_names:
218 sys.stdout.write("%s\n" % (tty_yellow + test_name + tty_normal))
219 test = load_guitest(test_name)
220 for nr, step in enumerate(test):
221 sys.stdout.write(" #%02d - %s\n" % (nr, step_title(step)))
222 sys.stdout.write("\n")
225 def step_title(step):
226 if "wait" in step:
227 return (" " * 41) + "%sWaiting for %d ms%s" % (tty_blue, step["wait"] * 1000.0, tty_normal)
229 if "page_title" in step["output"]:
230 page_title = step["output"]["page_title"][0]
231 else:
232 page_title = "(no page title)"
234 transid = step["variables"].get("_transid")
235 if transid == "valid":
236 action_marker = "[%sA%s] " % (tty_red, tty_normal)
237 elif transid == "invalid":
238 action_marker = "[X] "
239 else:
240 action_marker = " "
241 return "%s%-9s%s %-26s %s%s" % (tty_cyan, step["user"], tty_normal, step["filename"] + ".py",
242 action_marker, tty_bold + page_title + tty_normal)
245 def rule_step_url(test_name, step_nr):
246 return "http://localhost/%s/check_mk/guitest.py?test=%s&step=%d" % (omd_site(), test_name,
247 step_nr)
250 def start_stop_recording(args):
251 if recording_symlink_exists():
252 stop_recording()
253 else:
254 start_recording(args[0])
257 def start_recording(test_name):
258 set_recording_symlink(test_name)
259 if os.path.exists(recording_symlink()):
260 sys.stdout.write("Started recording, appending to existing test %s (%d steps exist).\n" %
261 (test_name, len(load_guitest(test_name))))
262 else:
263 sys.stdout.write("Started recording into new test %s.\n" % test_name)
266 def set_recording_symlink(test_name):
267 if recording_symlink_exists():
268 os.remove(recording_symlink())
269 os.symlink(test_name + ".mk", recording_symlink())
272 def recording_symlink_exists():
273 return os.path.lexists(recording_symlink())
276 def remove_recording_symlink():
277 os.remove(recording_symlink())
280 def stop_recording():
281 test_name = os.readlink(recording_symlink())[:-3]
282 if not os.path.exists(recording_symlink()):
283 sys.stdout.write("Aborted recording. Test %s not created.\n" % test_name)
284 remove_recording_symlink()
285 return
287 remove_recording_symlink()
288 test = load_guitest(test_name)
289 sys.stdout.write("Stopped recording test %s (%d steps).\n" % (test_name, len(test)))
292 def extract_from_test(args):
293 if len(args) < 1:
294 bail_out("Missing name of test to extract from.")
295 elif len(args) > 2:
296 bail_out("You can specify at most one test to extract from.")
298 from_test_name = args[0]
299 if len(args) > 1:
300 to_test_name = args[1]
301 else:
302 to_test_name = None
304 guitest = load_guitest(from_test_name)
306 extracted_steps = guitest[range_from:range_to]
307 if to_test_name:
308 save_guitest(to_test_name, extracted_steps)
309 else:
310 sys.stdout.write("%s\n" % pprint.pformat(extracted_steps))
313 def delete_steps_from_test(args):
314 if len(args) != 1:
315 bail_out("Please specify exactly one test to delete steps from.")
317 test_name = args[0]
318 guitest = load_guitest(test_name)
319 del guitest[range_from:range_to + 1]
320 save_guitest(test_name, guitest)
323 def add_wait_step(args):
324 if len(args) != 3:
325 bail_out(
326 "Please specify the test, the step number to add the wait after and the duration in ms")
327 test_name = args[2]
328 guitest = load_guitest(test_name)
329 step_number = int(args[0])
330 wait_seconds = int(args[1]) / 1000.0
331 guitest[step_number + 1:step_number + 1] = [{"wait": wait_seconds}]
332 save_guitest(test_name, guitest)
335 def add_reschedule_step(args):
336 if len(args) != 2:
337 bail_out("Please specify the test and the step number to add the reschdule after")
338 test_name = args[1]
339 guitest = load_guitest(test_name)
340 step_number = int(args[0])
341 guitest[step_number + 1:step_number + 1] = [{
342 'elapsed_time': 0.01,
343 'filename': 'guitest_reschedule_all',
344 'output': {
345 'message': [('message', 'All hosts are checked.\n'),
346 ('message', 'All services are checked.\n')],
347 'page_title': ['Rescheduling and waiting for check results']
349 'user': u'omdadmin',
350 'variables': {},
352 save_guitest(test_name, guitest)
356 # .-Helpers--------------------------------------------------------------.
357 # | _ _ _ |
358 # | | | | | ___| |_ __ ___ _ __ ___ |
359 # | | |_| |/ _ \ | '_ \ / _ \ '__/ __| |
360 # | | _ | __/ | |_) | __/ | \__ \ |
361 # | |_| |_|\___|_| .__/ \___|_| |___/ |
362 # | |_| |
363 # +----------------------------------------------------------------------+
364 # | Various helper functions |
365 # '----------------------------------------------------------------------'
367 if sys.stdout.isatty() and not os.name == "nt":
368 tty_bold = '\033[1m'
369 tty_normal = '\033[0m'
370 tty_red = tty_bold + '\033[31m'
371 tty_green = tty_bold + '\033[32m'
372 tty_yellow = tty_bold + '\033[33m'
373 tty_cyan = tty_bold + '\033[36m'
374 tty_blue = tty_bold + '\033[34m'
375 else:
376 tty_bold = ""
377 tty_normal = ""
378 tty_red = ""
379 tty_green = ""
380 tty_yellow = ""
381 tty_cyan = ""
382 tty_blue = ""
385 def omd_root():
386 return os.getenv("OMD_ROOT")
389 def omd_site():
390 return os.getenv("OMD_SITE")
393 def verbose(x):
394 if opt_verbose:
395 sys.stderr.write("%s\n" % x)
398 def bail_out(x):
399 sys.stderr.write("%s\n" % x)
400 sys.exit(1)
403 def parse_range_argument(a):
404 if ":" in a:
405 first_text, last_text = a.split(":")
406 if first_text:
407 first_step = int(first_text)
408 else:
409 first_step = 0
410 if last_text:
411 last_step = int(last_text)
412 else:
413 last_step = None
414 else:
415 first_step = int(a)
416 last_step = first_step
418 return first_step, last_step
421 def usage():
422 sys.stderr.write("""
423 Usage: cmk-guitest [OPTIONS] MODE [ARGS...]
425 OPERATION MODES
427 (no option) [TESTS...] Run some or all tests
428 -R, --repair Run a test and at the same time rerecord it
429 -r, --record Start/stop recording test TEST
430 -l, --list-tests [TEST] List one or all tests
431 -x, --extract RANGE TEST [OUTPUT] Extract a range of steps from a test to stdout
432 -D, --delete RANGE TEST Remove a range of steps from a test
433 -W, --add-wait-step STEP MS TEST Insert and artificial wait phase into the test
434 -E, --add-reschedule-step STEP TEST Insert and artificial "reschedule all checks"
435 -h, --help Show this crufty help
438 OPTIONS
440 -v, --verbose Output debug information on stderr
441 --debug Do not catch Python exceptions
442 -S, --steps RANGE Just run steps START...END (1st step is 0)
443 -i, --interactive When tests fail ask for automatic repair
444 -c, --continue Continue after errors
447 RANGES
449 0 Just the step 0
450 2:5 Steps 2, 3, 4 and 5
451 :5 All steps from 0 through 5
452 2: All steps from 2 until end of the test
454 """)
458 # .-main-----------------------------------------------------------------.
459 # | _ |
460 # | _ __ ___ __ _(_)_ __ |
461 # | | '_ ` _ \ / _` | | '_ \ |
462 # | | | | | | | (_| | | | | | |
463 # | |_| |_| |_|\__,_|_|_| |_| |
464 # | |
465 # +----------------------------------------------------------------------+
466 # | Main entry point, getopt, etc. |
467 # '----------------------------------------------------------------------'
469 short_options = 'hvrslRS:x:D:icWE'
470 long_options = [
471 "help", "debug", "verbose", "list-tests=", "record", "repair", "steps=", "extract=", "delete=",
472 "interactive", "continue", "add-wait-step", "add-reschedule-step"
475 opt_verbose = False
476 opt_debug = False
477 opt_repair = False
478 opt_steps_range = 0, None
479 opt_interactive = False
480 opt_continue = False
482 try:
483 opts, args = getopt.getopt(sys.argv[1:], short_options, long_options)
484 except getopt.GetoptError, err:
485 sys.stderr.write("%s\n\n" % err)
486 usage()
487 sys.exit(1)
489 mode_function = run_guitests
491 for o, a in opts:
492 # modes
493 if o in ['-h', '--help']:
494 usage()
495 sys.exit(0)
496 elif o in ['-l', '--list-tests']:
497 mode_function = list_guitests
498 elif o in ['-r', '--record']:
499 mode_function = start_stop_recording
500 elif o in ['-x', '--extract']:
501 opt_steps_range = parse_range_argument(a)
502 mode_function = extract_from_test
503 elif o in ['-D', '--delete']:
504 opt_steps_range = parse_range_argument(a)
505 mode_function = delete_steps_from_test
506 elif o in ['-W', '--add-wait-step']:
507 mode_function = add_wait_step
508 elif o in ['-E', '--add-reschedule-step']:
509 mode_function = add_reschedule_step
511 # Modifiers
512 elif o in ['-S', '--steps']:
513 opt_steps_range = parse_range_argument(a)
514 elif o in ['-v', '--verbose']:
515 opt_verbose = True
516 elif o == '--debug':
517 opt_debug = True
518 elif o in ['-R', '--repair']:
519 opt_repair = True
520 elif o in ['-i', '--interactive']:
521 opt_interactive = True
522 elif o in ['-c', '--continue']:
523 opt_continue = True
525 # Main modes
526 try:
527 test_names = []
528 for a in args:
529 if a.endswith(".mk"):
530 test_name = a[:-3] # Makes guitest wato.* easier
531 else:
532 test_name = a
533 test_names.append(test_name)
534 mode_function(test_names)
536 except Exception, e:
537 if opt_debug:
538 raise
539 bail_out(e)