2 import virt_utils
, virt_vm
, aexpect
7 A scheduler that manages several parallel test execution pipelines on a
11 def __init__(self
, tests
, num_workers
, total_cpus
, total_mem
, bindir
):
15 @param tests: A list of test dictionaries.
16 @param num_workers: The number of workers (pipelines).
17 @param total_cpus: The total number of CPUs to dedicate to tests.
18 @param total_mem: The total amount of memory to dedicate to tests.
19 @param bindir: The directory where environment files reside.
22 self
.num_workers
= num_workers
23 self
.total_cpus
= total_cpus
24 self
.total_mem
= total_mem
26 # Pipes -- s stands for scheduler, w stands for worker
27 self
.s2w
= [os
.pipe() for i
in range(num_workers
)]
28 self
.w2s
= [os
.pipe() for i
in range(num_workers
)]
29 self
.s2w_r
= [os
.fdopen(r
, "r", 0) for r
, w
in self
.s2w
]
30 self
.s2w_w
= [os
.fdopen(w
, "w", 0) for r
, w
in self
.s2w
]
31 self
.w2s_r
= [os
.fdopen(r
, "r", 0) for r
, w
in self
.w2s
]
32 self
.w2s_w
= [os
.fdopen(w
, "w", 0) for r
, w
in self
.w2s
]
33 # "Personal" worker dicts contain modifications that are applied
34 # specifically to each worker. For example, each worker must use a
35 # different environment file and a different MAC address pool.
36 self
.worker_dicts
= [{"env": "env%d" % i
} for i
in range(num_workers
)]
39 def worker(self
, index
, run_test_func
):
43 Waits for commands from the scheduler and processes them.
45 @param index: The index of this worker (in the range 0..num_workers-1).
46 @param run_test_func: A function to be called to run a test
51 self_dict
= self
.worker_dicts
[index
]
53 # Inform the scheduler this worker is ready
57 cmd
= r
.readline().split()
61 # The scheduler wants this worker to run a test
63 test_index
= int(cmd
[1])
64 test
= self
.tests
[test_index
].copy()
65 test
.update(self_dict
)
66 test_iterations
= int(test
.get("iterations", 1))
67 status
= run_test_func("kvm", params
=test
,
68 tag
=test
.get("shortname"),
69 iterations
=test_iterations
)
70 w
.write("done %s %s\n" % (test_index
, status
))
73 # The scheduler wants this worker to free its used resources
74 elif cmd
[0] == "cleanup":
75 env_filename
= os
.path
.join(self
.bindir
, self_dict
["env"])
76 env
= virt_utils
.Env(env_filename
)
77 for obj
in env
.values():
78 if isinstance(obj
, virt_vm
.VM
):
80 elif isinstance(obj
, aexpect
.Spawn
):
83 w
.write("cleanup_done\n")
86 # There's no more work for this worker
87 elif cmd
[0] == "terminate":
93 The scheduler function.
95 Sends commands to workers, telling them to run tests, clean up or
100 test_status
= ["waiting"] * len(self
.tests
)
101 test_worker
= [None] * len(self
.tests
)
102 used_cpus
= [0] * self
.num_workers
103 used_mem
= [0] * self
.num_workers
106 # Wait for a message from a worker
107 r
, w
, x
= select
.select(self
.w2s_r
, [], [])
109 someone_is_ready
= False
112 worker_index
= self
.w2s_r
.index(pipe
)
113 msg
= pipe
.readline().split()
117 # A worker is ready -- add it to the idle_workers list
118 if msg
[0] == "ready":
119 idle_workers
.append(worker_index
)
120 someone_is_ready
= True
122 # A worker completed a test
123 elif msg
[0] == "done":
124 test_index
= int(msg
[1])
125 test
= self
.tests
[test_index
]
126 status
= int(eval(msg
[2]))
127 test_status
[test_index
] = ("fail", "pass")[status
]
128 # If the test failed, mark all dependent tests as "failed" too
130 for i
, other_test
in enumerate(self
.tests
):
131 for dep
in other_test
.get("dep", []):
132 if dep
in test
["name"]:
133 test_status
[i
] = "fail"
135 # A worker is done shutting down its VMs and other processes
136 elif msg
[0] == "cleanup_done":
137 used_cpus
[worker_index
] = 0
138 used_mem
[worker_index
] = 0
139 closing_workers
.remove(worker_index
)
141 if not someone_is_ready
:
144 for worker
in idle_workers
[:]:
145 # Find a test for this worker
147 for i
, test
in enumerate(self
.tests
):
148 # We only want "waiting" tests
149 if test_status
[i
] != "waiting":
151 # Make sure the test isn't assigned to another worker
152 if test_worker
[i
] is not None and test_worker
[i
] != worker
:
154 # Make sure the test's dependencies are satisfied
155 dependencies_satisfied
= True
156 for dep
in test
["dep"]:
157 dependencies
= [j
for j
, t
in enumerate(self
.tests
)
159 bad_status_deps
= [j
for j
in dependencies
160 if test_status
[j
] != "pass"]
162 dependencies_satisfied
= False
164 if not dependencies_satisfied
:
166 # Make sure we have enough resources to run the test
167 test_used_cpus
= int(test
.get("used_cpus", 1))
168 test_used_mem
= int(test
.get("used_mem", 128))
169 # First make sure the other workers aren't using too many
170 # CPUs (not including the workers currently shutting down)
171 uc
= (sum(used_cpus
) - used_cpus
[worker
] -
172 sum(used_cpus
[i
] for i
in closing_workers
))
173 if uc
and uc
+ test_used_cpus
> self
.total_cpus
:
175 # ... or too much memory
176 um
= (sum(used_mem
) - used_mem
[worker
] -
177 sum(used_mem
[i
] for i
in closing_workers
))
178 if um
and um
+ test_used_mem
> self
.total_mem
:
180 # If we reached this point it means there are, or will
181 # soon be, enough resources to run the test
183 # Now check if the test can be run right now, i.e. if the
184 # other workers, including the ones currently shutting
185 # down, aren't using too many CPUs
186 uc
= (sum(used_cpus
) - used_cpus
[worker
])
187 if uc
and uc
+ test_used_cpus
> self
.total_cpus
:
189 # ... or too much memory
190 um
= (sum(used_mem
) - used_mem
[worker
])
191 if um
and um
+ test_used_mem
> self
.total_mem
:
193 # Everything is OK -- run the test
194 test_status
[i
] = "running"
195 test_worker
[i
] = worker
196 idle_workers
.remove(worker
)
197 # Update used_cpus and used_mem
198 used_cpus
[worker
] = test_used_cpus
199 used_mem
[worker
] = test_used_mem
200 # Assign all related tests to this worker
201 for j
, other_test
in enumerate(self
.tests
):
202 for other_dep
in other_test
["dep"]:
203 # All tests that depend on this test
204 if other_dep
in test
["name"]:
205 test_worker
[j
] = worker
207 # ... and all tests that share a dependency
209 for dep
in test
["dep"]:
210 if dep
in other_dep
or other_dep
in dep
:
211 test_worker
[j
] = worker
213 # Tell the worker to run the test
214 self
.s2w_w
[worker
].write("run %s\n" % i
)
217 # If there won't be any tests for this worker to run soon, tell
218 # the worker to free its used resources
219 if not test_found
and (used_cpus
[worker
] or used_mem
[worker
]):
220 self
.s2w_w
[worker
].write("cleanup\n")
221 idle_workers
.remove(worker
)
222 closing_workers
.append(worker
)
224 # If there are no more new tests to run, terminate the workers and
226 if len(idle_workers
) == self
.num_workers
:
227 for worker
in idle_workers
:
228 self
.s2w_w
[worker
].write("terminate\n")