selftests: Introduce a new test case to tc testsuite
[linux-2.6/btrfs-unstable.git] / tools / testing / selftests / tc-testing / tdc.py
blob5f11f5d7456e7c1488a2dabe38a81a5a5f949eea
1 #!/usr/bin/env python3
3 """
4 tdc.py - Linux tc (Traffic Control) unit test driver
6 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
7 """
9 import re
10 import os
11 import sys
12 import argparse
13 import json
14 import subprocess
15 from collections import OrderedDict
16 from string import Template
18 from tdc_config import *
19 from tdc_helper import *
22 USE_NS = True
25 def replace_keywords(cmd):
26 """
27 For a given executable command, substitute any known
28 variables contained within NAMES with the correct values
29 """
30 tcmd = Template(cmd)
31 subcmd = tcmd.safe_substitute(NAMES)
32 return subcmd
35 def exec_cmd(command, nsonly=True):
36 """
37 Perform any required modifications on an executable command, then run
38 it in a subprocess and return the results.
39 """
40 if (USE_NS and nsonly):
41 command = 'ip netns exec $NS ' + command
43 if '$' in command:
44 command = replace_keywords(command)
46 proc = subprocess.Popen(command,
47 shell=True,
48 stdout=subprocess.PIPE,
49 stderr=subprocess.PIPE)
50 (rawout, serr) = proc.communicate()
52 if proc.returncode != 0:
53 foutput = serr.decode("utf-8")
54 else:
55 foutput = rawout.decode("utf-8")
57 proc.stdout.close()
58 proc.stderr.close()
59 return proc, foutput
62 def prepare_env(cmdlist):
63 """
64 Execute the setup/teardown commands for a test case. Optionally
65 terminate test execution if the command fails.
66 """
67 for cmdinfo in cmdlist:
68 if (type(cmdinfo) == list):
69 exit_codes = cmdinfo[1:]
70 cmd = cmdinfo[0]
71 else:
72 exit_codes = [0]
73 cmd = cmdinfo
75 if (len(cmd) == 0):
76 continue
78 (proc, foutput) = exec_cmd(cmd)
80 if proc.returncode not in exit_codes:
81 print
82 print("Could not execute:")
83 print(cmd)
84 print("\nError message:")
85 print(foutput)
86 print("\nAborting test run.")
87 ns_destroy()
88 exit(1)
91 def test_runner(filtered_tests, args):
92 """
93 Driver function for the unit tests.
95 Prints information about the tests being run, executes the setup and
96 teardown commands and the command under test itself. Also determines
97 success/failure based on the information in the test case and generates
98 TAP output accordingly.
99 """
100 testlist = filtered_tests
101 tcount = len(testlist)
102 index = 1
103 tap = str(index) + ".." + str(tcount) + "\n"
105 for tidx in testlist:
106 result = True
107 tresult = ""
108 if "flower" in tidx["category"] and args.device == None:
109 continue
110 print("Test " + tidx["id"] + ": " + tidx["name"])
111 prepare_env(tidx["setup"])
112 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
113 exit_code = p.returncode
115 if (exit_code != int(tidx["expExitCode"])):
116 result = False
117 print("exit:", exit_code, int(tidx["expExitCode"]))
118 print(procout)
119 else:
120 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
121 (p, procout) = exec_cmd(tidx["verifyCmd"])
122 match_index = re.findall(match_pattern, procout)
123 if len(match_index) != int(tidx["matchCount"]):
124 result = False
126 if result == True:
127 tresult += "ok "
128 else:
129 tresult += "not ok "
130 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
132 if result == False:
133 tap += procout
135 prepare_env(tidx["teardown"])
136 index += 1
138 return tap
141 def ns_create():
143 Create the network namespace in which the tests will be run and set up
144 the required network devices for it.
146 if (USE_NS):
147 cmd = 'ip netns add $NS'
148 exec_cmd(cmd, False)
149 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
150 exec_cmd(cmd, False)
151 cmd = 'ip link set $DEV1 netns $NS'
152 exec_cmd(cmd, False)
153 cmd = 'ip link set $DEV0 up'
154 exec_cmd(cmd, False)
155 cmd = 'ip -s $NS link set $DEV1 up'
156 exec_cmd(cmd, False)
157 cmd = 'ip link set $DEV2 netns $NS'
158 exec_cmd(cmd, False)
159 cmd = 'ip -s $NS link set $DEV2 up'
160 exec_cmd(cmd, False)
163 def ns_destroy():
165 Destroy the network namespace for testing (and any associated network
166 devices as well)
168 if (USE_NS):
169 cmd = 'ip netns delete $NS'
170 exec_cmd(cmd, False)
173 def has_blank_ids(idlist):
175 Search the list for empty ID fields and return true/false accordingly.
177 return not(all(k for k in idlist))
180 def load_from_file(filename):
182 Open the JSON file containing the test cases and return them as an
183 ordered dictionary object.
185 with open(filename) as test_data:
186 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
187 idlist = get_id_list(testlist)
188 if (has_blank_ids(idlist)):
189 for k in testlist:
190 k['filename'] = filename
191 return testlist
194 def args_parse():
196 Create the argument parser.
198 parser = argparse.ArgumentParser(description='Linux TC unit tests')
199 return parser
202 def set_args(parser):
204 Set the command line arguments for tdc.
206 parser.add_argument('-p', '--path', type=str,
207 help='The full path to the tc executable to use')
208 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
209 help='Run tests only from the specified category, or if no category is specified, list known categories.')
210 parser.add_argument('-f', '--file', type=str,
211 help='Run tests from the specified file')
212 parser.add_argument('-l', '--list', type=str, nargs='?', const="", metavar='CATEGORY',
213 help='List all test cases, or those only within the specified category')
214 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
215 help='Display the test case with specified id')
216 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
217 help='Execute the single test case with specified ID')
218 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
219 help='Generate ID numbers for new test cases')
220 parser.add_argument('-d', '--device',
221 help='Execute the test case in flower category')
222 return parser
225 def check_default_settings(args):
227 Process any arguments overriding the default settings, and ensure the
228 settings are correct.
230 # Allow for overriding specific settings
231 global NAMES
233 if args.path != None:
234 NAMES['TC'] = args.path
235 if args.device != None:
236 NAMES['DEV2'] = args.device
237 if not os.path.isfile(NAMES['TC']):
238 print("The specified tc path " + NAMES['TC'] + " does not exist.")
239 exit(1)
242 def get_id_list(alltests):
244 Generate a list of all IDs in the test cases.
246 return [x["id"] for x in alltests]
249 def check_case_id(alltests):
251 Check for duplicate test case IDs.
253 idl = get_id_list(alltests)
254 return [x for x in idl if idl.count(x) > 1]
257 def does_id_exist(alltests, newid):
259 Check if a given ID already exists in the list of test cases.
261 idl = get_id_list(alltests)
262 return (any(newid == x for x in idl))
265 def generate_case_ids(alltests):
267 If a test case has a blank ID field, generate a random hex ID for it
268 and then write the test cases back to disk.
270 import random
271 for c in alltests:
272 if (c["id"] == ""):
273 while True:
274 newid = str('%04x' % random.randrange(16**4))
275 if (does_id_exist(alltests, newid)):
276 continue
277 else:
278 c['id'] = newid
279 break
281 ufilename = []
282 for c in alltests:
283 if ('filename' in c):
284 ufilename.append(c['filename'])
285 ufilename = get_unique_item(ufilename)
286 for f in ufilename:
287 testlist = []
288 for t in alltests:
289 if 'filename' in t:
290 if t['filename'] == f:
291 del t['filename']
292 testlist.append(t)
293 outfile = open(f, "w")
294 json.dump(testlist, outfile, indent=4)
295 outfile.close()
298 def get_test_cases(args):
300 If a test case file is specified, retrieve tests from that file.
301 Otherwise, glob for all json files in subdirectories and load from
302 each one.
304 import fnmatch
305 if args.file != None:
306 if not os.path.isfile(args.file):
307 print("The specified test case file " + args.file + " does not exist.")
308 exit(1)
309 flist = [args.file]
310 else:
311 flist = []
312 for root, dirnames, filenames in os.walk('tc-tests'):
313 for filename in fnmatch.filter(filenames, '*.json'):
314 flist.append(os.path.join(root, filename))
315 alltests = list()
316 for casefile in flist:
317 alltests = alltests + (load_from_file(casefile))
318 return alltests
321 def set_operation_mode(args):
323 Load the test case data and process remaining arguments to determine
324 what the script should do for this run, and call the appropriate
325 function.
327 alltests = get_test_cases(args)
329 if args.gen_id:
330 idlist = get_id_list(alltests)
331 if (has_blank_ids(idlist)):
332 alltests = generate_case_ids(alltests)
333 else:
334 print("No empty ID fields found in test files.")
335 exit(0)
337 duplicate_ids = check_case_id(alltests)
338 if (len(duplicate_ids) > 0):
339 print("The following test case IDs are not unique:")
340 print(str(set(duplicate_ids)))
341 print("Please correct them before continuing.")
342 exit(1)
344 ucat = get_test_categories(alltests)
346 if args.showID:
347 show_test_case_by_id(alltests, args.showID[0])
348 exit(0)
350 if args.execute:
351 target_id = args.execute[0]
352 else:
353 target_id = ""
355 if args.category:
356 if (args.category == '+c'):
357 print("Available categories:")
358 print_sll(ucat)
359 exit(0)
360 else:
361 target_category = args.category
362 else:
363 target_category = ""
366 testcases = get_categorized_testlist(alltests, ucat)
368 if args.list:
369 if (len(args.list) == 0):
370 list_test_cases(alltests)
371 exit(0)
372 elif(len(args.list > 0)):
373 if (args.list not in ucat):
374 print("Unknown category " + args.list)
375 print("Available categories:")
376 print_sll(ucat)
377 exit(1)
378 list_test_cases(testcases[args.list])
379 exit(0)
381 if (os.geteuid() != 0):
382 print("This script must be run with root privileges.\n")
383 exit(1)
385 ns_create()
387 if (len(target_category) == 0):
388 if (len(target_id) > 0):
389 alltests = list(filter(lambda x: target_id in x['id'], alltests))
390 if (len(alltests) == 0):
391 print("Cannot find a test case with ID matching " + target_id)
392 exit(1)
393 catresults = test_runner(alltests, args)
394 print("All test results: " + "\n\n" + catresults)
395 elif (len(target_category) > 0):
396 if (target_category == "flower") and args.device == None:
397 print("Please specify a NIC device (-d) to run category flower")
398 exit(1)
399 if (target_category not in ucat):
400 print("Specified category is not present in this file.")
401 exit(1)
402 else:
403 catresults = test_runner(testcases[target_category], args)
404 print("Category " + target_category + "\n\n" + catresults)
406 ns_destroy()
409 def main():
411 Start of execution; set up argument parser and get the arguments,
412 and start operations.
414 parser = args_parse()
415 parser = set_args(parser)
416 (args, remaining) = parser.parse_known_args()
417 check_default_settings(args)
419 set_operation_mode(args)
421 exit(0)
424 if __name__ == "__main__":
425 main()