virt.virt_test_utils: run_autotest - 'tar' needs relative paths to strip the leading '/'
[autotest-zwu.git] / client / common_lib / base_barrier.py
blobe1063a912b8ca106ab7c11217b0c55e9e202c306
1 import sys, socket, errno, logging
2 from time import time, sleep
3 from autotest_lib.client.common_lib import error
5 # default barrier port
6 _DEFAULT_PORT = 11922
8 def get_host_from_id(hostid):
9 # Remove any trailing local identifier following a #.
10 # This allows multiple members per host which is particularly
11 # helpful in testing.
12 if not hostid.startswith('#'):
13 return hostid.split('#')[0]
14 else:
15 raise error.BarrierError(
16 "Invalid Host id: Host Address should be specified")
19 class BarrierAbortError(error.BarrierError):
20 """Special BarrierError raised when an explicit abort is requested."""
23 class listen_server(object):
24 """
25 Manages a listening socket for barrier.
27 Can be used to run multiple barrier instances with the same listening
28 socket (if they were going to listen on the same port).
30 Attributes:
32 @attr address: Address to bind to (string).
33 @attr port: Port to bind to.
34 @attr socket: Listening socket object.
35 """
36 def __init__(self, address='', port=_DEFAULT_PORT):
37 """
38 Create a listen_server instance for the given address/port.
40 @param address: The address to listen on.
41 @param port: The port to listen on.
42 """
43 self.address = address
44 self.port = port
45 self.socket = self._setup()
48 def _setup(self):
49 """Create, bind and listen on the listening socket."""
50 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
51 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
52 sock.bind((self.address, self.port))
53 sock.listen(10)
55 return sock
58 def close(self):
59 """Close the listening socket."""
60 self.socket.close()
63 class barrier(object):
64 """Multi-machine barrier support.
66 Provides multi-machine barrier mechanism.
67 Execution stops until all members arrive at the barrier.
69 Implementation Details:
70 .......................
72 When a barrier is forming the master node (first in sort order) in the
73 set accepts connections from each member of the set. As they arrive
74 they indicate the barrier they are joining and their identifier (their
75 hostname or IP address and optional tag). They are then asked to wait.
76 When all members are present the master node then checks that each
77 member is still responding via a ping/pong exchange. If this is
78 successful then everyone has checked in at the barrier. We then tell
79 everyone they may continue via a rlse message.
81 Where the master is not the first to reach the barrier the client
82 connects will fail. Client will retry until they either succeed in
83 connecting to master or the overall timeout is exceeded.
85 As an example here is the exchange for a three node barrier called
86 'TAG'
88 MASTER CLIENT1 CLIENT2
89 <-------------TAG C1-------------
90 --------------wait-------------->
91 [...]
92 <-------------TAG C2-----------------------------
93 --------------wait------------------------------>
94 [...]
95 --------------ping-------------->
96 <-------------pong---------------
97 --------------ping------------------------------>
98 <-------------pong-------------------------------
99 ----- BARRIER conditions MET -----
100 --------------rlse-------------->
101 --------------rlse------------------------------>
103 Note that once the last client has responded to pong the barrier is
104 implicitly deemed satisifed, they have all acknowledged their presence.
105 If we fail to send any of the rlse messages the barrier is still a
106 success, the failed host has effectively broken 'right at the beginning'
107 of the post barrier execution window.
109 In addition, there is another rendezvous, that makes each slave a server
110 and the master a client. The connection process and usage is still the
111 same but allows barriers from machines that only have a one-way
112 connection initiation. This is called rendezvous_servers.
114 For example:
115 if ME == SERVER:
116 server start
118 b = job.barrier(ME, 'server-up', 120)
119 b.rendezvous(CLIENT, SERVER)
121 if ME == CLIENT:
122 client run
124 b = job.barrier(ME, 'test-complete', 3600)
125 b.rendezvous(CLIENT, SERVER)
127 if ME == SERVER:
128 server stop
130 Any client can also request an abort of the job by setting
131 abort=True in the rendezvous arguments.
134 def __init__(self, hostid, tag, timeout=None, port=None,
135 listen_server=None):
137 @param hostid: My hostname/IP address + optional tag.
138 @param tag: Symbolic name of the barrier in progress.
139 @param timeout: Maximum seconds to wait for a the barrier to meet.
140 @param port: Port number to listen on.
141 @param listen_server: External listen_server instance to use instead
142 of creating our own. Create a listen_server instance and
143 reuse it across multiple barrier instances so that the
144 barrier code doesn't try to quickly re-bind on the same port
145 (packets still in transit for the previous barrier they may
146 reset new connections).
148 self._hostid = hostid
149 self._tag = tag
150 if listen_server:
151 if port:
152 raise error.BarrierError(
153 '"port" and "listen_server" are mutually exclusive.')
154 self._port = listen_server.port
155 else:
156 self._port = port or _DEFAULT_PORT
157 self._server = listen_server # A listen_server instance or None.
158 self._members = [] # List of hosts we expect to find at the barrier.
159 self._timeout_secs = timeout
160 self._start_time = None # Timestamp of when we started waiting.
161 self._masterid = None # Host/IP + optional tag of selected master.
162 logging.info("tag=%s port=%d timeout=%r",
163 self._tag, self._port, self._timeout_secs)
165 # Number of clients seen (should be the length of self._waiting).
166 self._seen = 0
168 # Clients who have checked in and are waiting (if we are a master).
169 self._waiting = {} # Maps from hostname -> (client, addr) tuples.
172 def _update_timeout(self, timeout):
173 if timeout is not None and self._start_time is not None:
174 self._timeout_secs = (time() - self._start_time) + timeout
175 else:
176 self._timeout_secs = timeout
179 def _remaining(self):
180 if self._timeout_secs is not None and self._start_time is not None:
181 timeout = self._timeout_secs - (time() - self._start_time)
182 if timeout <= 0:
183 errmsg = "timeout waiting for barrier: %s" % self._tag
184 logging.error(error)
185 raise error.BarrierError(errmsg)
186 else:
187 timeout = self._timeout_secs
189 if self._timeout_secs is not None:
190 logging.info("seconds remaining: %d", timeout)
191 return timeout
194 def _master_welcome(self, connection):
195 client, addr = connection
196 name = None
198 client.settimeout(5)
199 try:
200 # Get the clients name.
201 intro = client.recv(1024)
202 intro = intro.strip("\r\n")
204 intro_parts = intro.split(' ', 2)
205 if len(intro_parts) != 2:
206 logging.warn("Ignoring invalid data from %s: %r",
207 client.getpeername(), intro)
208 client.close()
209 return
210 tag, name = intro_parts
212 logging.info("new client tag=%s, name=%s", tag, name)
214 # Ok, we know who is trying to attach. Confirm that
215 # they are coming to the same meeting. Also, everyone
216 # should be using a unique handle (their IP address).
217 # If we see a duplicate, something _bad_ has happened
218 # so drop them now.
219 if self._tag != tag:
220 logging.warn("client arriving for the wrong barrier: %s != %s",
221 self._tag, tag)
222 client.settimeout(5)
223 client.send("!tag")
224 client.close()
225 return
226 elif name in self._waiting:
227 logging.warn("duplicate client")
228 client.settimeout(5)
229 client.send("!dup")
230 client.close()
231 return
233 # Acknowledge the client
234 client.send("wait")
236 except socket.timeout:
237 # This is nominally an error, but as we do not know
238 # who that was we cannot do anything sane other
239 # than report it and let the normal timeout kill
240 # us when thats appropriate.
241 logging.warn("client handshake timeout: (%s:%d)",
242 addr[0], addr[1])
243 client.close()
244 return
246 logging.info("client now waiting: %s (%s:%d)",
247 name, addr[0], addr[1])
249 # They seem to be valid record them.
250 self._waiting[name] = connection
251 self._seen += 1
254 def _slave_hello(self, connection):
255 (client, addr) = connection
256 name = None
258 client.settimeout(5)
259 try:
260 client.send(self._tag + " " + self._hostid)
262 reply = client.recv(4)
263 reply = reply.strip("\r\n")
264 logging.info("master said: %s", reply)
266 # Confirm the master accepted the connection.
267 if reply != "wait":
268 logging.warn("Bad connection request to master")
269 client.close()
270 return
272 except socket.timeout:
273 # This is nominally an error, but as we do not know
274 # who that was we cannot do anything sane other
275 # than report it and let the normal timeout kill
276 # us when thats appropriate.
277 logging.error("master handshake timeout: (%s:%d)",
278 addr[0], addr[1])
279 client.close()
280 return
282 logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
284 # They seem to be valid record them.
285 self._waiting[self._hostid] = connection
286 self._seen = 1
289 def _master_release(self):
290 # Check everyone is still there, that they have not
291 # crashed or disconnected in the meantime.
292 allpresent = True
293 abort = self._abort
294 for name in self._waiting:
295 (client, addr) = self._waiting[name]
297 logging.info("checking client present: %s", name)
299 client.settimeout(5)
300 reply = 'none'
301 try:
302 client.send("ping")
303 reply = client.recv(1024)
304 except socket.timeout:
305 logging.warn("ping/pong timeout: %s", name)
306 pass
308 if reply == 'abrt':
309 logging.warn("Client %s requested abort", name)
310 abort = True
311 elif reply != "pong":
312 allpresent = False
314 if not allpresent:
315 raise error.BarrierError("master lost client")
317 if abort:
318 logging.info("Aborting the clients")
319 msg = 'abrt'
320 else:
321 logging.info("Releasing clients")
322 msg = 'rlse'
324 # If every ones checks in then commit the release.
325 for name in self._waiting:
326 (client, addr) = self._waiting[name]
328 client.settimeout(5)
329 try:
330 client.send(msg)
331 except socket.timeout:
332 logging.warn("release timeout: %s", name)
333 pass
335 if abort:
336 raise BarrierAbortError("Client requested abort")
339 def _waiting_close(self):
340 # Either way, close out all the clients. If we have
341 # not released them then they know to abort.
342 for name in self._waiting:
343 (client, addr) = self._waiting[name]
345 logging.info("closing client: %s", name)
347 try:
348 client.close()
349 except:
350 pass
353 def _run_server(self, is_master):
354 server = self._server or listen_server(port=self._port)
355 failed = 0
356 try:
357 while True:
358 try:
359 # Wait for callers welcoming each.
360 server.socket.settimeout(self._remaining())
361 connection = server.socket.accept()
362 if is_master:
363 self._master_welcome(connection)
364 else:
365 self._slave_hello(connection)
366 except socket.timeout:
367 logging.warn("timeout waiting for remaining clients")
368 pass
370 if is_master:
371 # Check if everyone is here.
372 logging.info("master seen %d of %d",
373 self._seen, len(self._members))
374 if self._seen == len(self._members):
375 self._master_release()
376 break
377 else:
378 # Check if master connected.
379 if self._seen:
380 logging.info("slave connected to master")
381 self._slave_wait()
382 break
383 finally:
384 self._waiting_close()
385 # if we created the listening_server in the beginning of this
386 # function then close the listening socket here
387 if not self._server:
388 server.close()
391 def _run_client(self, is_master):
392 while self._remaining() is None or self._remaining() > 0:
393 try:
394 remote = socket.socket(socket.AF_INET,
395 socket.SOCK_STREAM)
396 remote.settimeout(30)
397 if is_master:
398 # Connect to all slaves.
399 host = get_host_from_id(self._members[self._seen])
400 logging.info("calling slave: %s", host)
401 connection = (remote, (host, self._port))
402 remote.connect(connection[1])
403 self._master_welcome(connection)
404 else:
405 # Just connect to the master.
406 host = get_host_from_id(self._masterid)
407 logging.info("calling master")
408 connection = (remote, (host, self._port))
409 remote.connect(connection[1])
410 self._slave_hello(connection)
411 except socket.timeout:
412 logging.warn("timeout calling host, retry")
413 sleep(10)
414 pass
415 except socket.error, err:
416 (code, str) = err
417 if (code != errno.ECONNREFUSED):
418 raise
419 sleep(10)
421 if is_master:
422 # Check if everyone is here.
423 logging.info("master seen %d of %d",
424 self._seen, len(self._members))
425 if self._seen == len(self._members):
426 self._master_release()
427 break
428 else:
429 # Check if master connected.
430 if self._seen:
431 logging.info("slave connected to master")
432 self._slave_wait()
433 break
435 self._waiting_close()
438 def _slave_wait(self):
439 remote = self._waiting[self._hostid][0]
440 mode = "wait"
441 while True:
442 # All control messages are the same size to allow
443 # us to split individual messages easily.
444 remote.settimeout(self._remaining())
445 reply = remote.recv(4)
446 if not reply:
447 break
449 reply = reply.strip("\r\n")
450 logging.info("master said: %s", reply)
452 mode = reply
453 if reply == "ping":
454 # Ensure we have sufficient time for the
455 # ping/pong/rlse cyle to complete normally.
456 self._update_timeout(10 + 10 * len(self._members))
458 if self._abort:
459 msg = "abrt"
460 else:
461 msg = "pong"
462 logging.info(msg)
463 remote.settimeout(self._remaining())
464 remote.send(msg)
466 elif reply == "rlse" or reply == "abrt":
467 # Ensure we have sufficient time for the
468 # ping/pong/rlse cyle to complete normally.
469 self._update_timeout(10 + 10 * len(self._members))
471 logging.info("was released, waiting for close")
473 if mode == "rlse":
474 pass
475 elif mode == "wait":
476 raise error.BarrierError("master abort -- barrier timeout")
477 elif mode == "ping":
478 raise error.BarrierError("master abort -- client lost")
479 elif mode == "!tag":
480 raise error.BarrierError("master abort -- incorrect tag")
481 elif mode == "!dup":
482 raise error.BarrierError("master abort -- duplicate client")
483 elif mode == "abrt":
484 raise BarrierAbortError("Client requested abort")
485 else:
486 raise error.BarrierError("master handshake failure: " + mode)
489 def rendezvous(self, *hosts, **dargs):
490 # if called with abort=True, this will raise an exception
491 # on all the clients.
492 self._start_time = time()
493 self._members = list(hosts)
494 self._members.sort()
495 self._masterid = self._members.pop(0)
496 self._abort = dargs.get('abort', False)
498 logging.info("masterid: %s", self._masterid)
499 if self._abort:
500 logging.debug("%s is aborting", self._hostid)
501 if not len(self._members):
502 logging.info("No other members listed.")
503 return
504 logging.info("members: %s", ",".join(self._members))
506 self._seen = 0
507 self._waiting = {}
509 # Figure out who is the master in this barrier.
510 if self._hostid == self._masterid:
511 logging.info("selected as master")
512 self._run_server(is_master=True)
513 else:
514 logging.info("selected as slave")
515 self._run_client(is_master=False)
518 def rendezvous_servers(self, masterid, *hosts, **dargs):
519 # if called with abort=True, this will raise an exception
520 # on all the clients.
521 self._start_time = time()
522 self._members = list(hosts)
523 self._members.sort()
524 self._masterid = masterid
525 self._abort = dargs.get('abort', False)
527 logging.info("masterid: %s", self._masterid)
528 if not len(self._members):
529 logging.info("No other members listed.")
530 return
531 logging.info("members: %s", ",".join(self._members))
533 self._seen = 0
534 self._waiting = {}
536 # Figure out who is the master in this barrier.
537 if self._hostid == self._masterid:
538 logging.info("selected as master")
539 self._run_client(is_master=True)
540 else:
541 logging.info("selected as slave")
542 self._run_server(is_master=False)