1 import sys
, socket
, errno
, logging
2 from time
import time
, sleep
3 from autotest_lib
.client
.common_lib
import error
8 def get_host_from_id(hostid
):
9 # Remove any trailing local identifier following a #.
10 # This allows multiple members per host which is particularly
12 if not hostid
.startswith('#'):
13 return hostid
.split('#')[0]
15 raise error
.BarrierError(
16 "Invalid Host id: Host Address should be specified")
19 class BarrierAbortError(error
.BarrierError
):
20 """Special BarrierError raised when an explicit abort is requested."""
23 class listen_server(object):
25 Manages a listening socket for barrier.
27 Can be used to run multiple barrier instances with the same listening
28 socket (if they were going to listen on the same port).
32 @attr address: Address to bind to (string).
33 @attr port: Port to bind to.
34 @attr socket: Listening socket object.
36 def __init__(self
, address
='', port
=_DEFAULT_PORT
):
38 Create a listen_server instance for the given address/port.
40 @param address: The address to listen on.
41 @param port: The port to listen on.
43 self
.address
= address
45 self
.socket
= self
._setup
()
49 """Create, bind and listen on the listening socket."""
50 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
51 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
52 sock
.bind((self
.address
, self
.port
))
59 """Close the listening socket."""
63 class barrier(object):
64 """Multi-machine barrier support.
66 Provides multi-machine barrier mechanism.
67 Execution stops until all members arrive at the barrier.
69 Implementation Details:
70 .......................
72 When a barrier is forming the master node (first in sort order) in the
73 set accepts connections from each member of the set. As they arrive
74 they indicate the barrier they are joining and their identifier (their
75 hostname or IP address and optional tag). They are then asked to wait.
76 When all members are present the master node then checks that each
77 member is still responding via a ping/pong exchange. If this is
78 successful then everyone has checked in at the barrier. We then tell
79 everyone they may continue via a rlse message.
81 Where the master is not the first to reach the barrier the client
82 connects will fail. Client will retry until they either succeed in
83 connecting to master or the overall timeout is exceeded.
85 As an example here is the exchange for a three node barrier called
88 MASTER CLIENT1 CLIENT2
89 <-------------TAG C1-------------
90 --------------wait-------------->
92 <-------------TAG C2-----------------------------
93 --------------wait------------------------------>
95 --------------ping-------------->
96 <-------------pong---------------
97 --------------ping------------------------------>
98 <-------------pong-------------------------------
99 ----- BARRIER conditions MET -----
100 --------------rlse-------------->
101 --------------rlse------------------------------>
103 Note that once the last client has responded to pong the barrier is
104 implicitly deemed satisifed, they have all acknowledged their presence.
105 If we fail to send any of the rlse messages the barrier is still a
106 success, the failed host has effectively broken 'right at the beginning'
107 of the post barrier execution window.
109 In addition, there is another rendezvous, that makes each slave a server
110 and the master a client. The connection process and usage is still the
111 same but allows barriers from machines that only have a one-way
112 connection initiation. This is called rendezvous_servers.
118 b = job.barrier(ME, 'server-up', 120)
119 b.rendezvous(CLIENT, SERVER)
124 b = job.barrier(ME, 'test-complete', 3600)
125 b.rendezvous(CLIENT, SERVER)
130 Any client can also request an abort of the job by setting
131 abort=True in the rendezvous arguments.
134 def __init__(self
, hostid
, tag
, timeout
=None, port
=None,
137 @param hostid: My hostname/IP address + optional tag.
138 @param tag: Symbolic name of the barrier in progress.
139 @param timeout: Maximum seconds to wait for a the barrier to meet.
140 @param port: Port number to listen on.
141 @param listen_server: External listen_server instance to use instead
142 of creating our own. Create a listen_server instance and
143 reuse it across multiple barrier instances so that the
144 barrier code doesn't try to quickly re-bind on the same port
145 (packets still in transit for the previous barrier they may
146 reset new connections).
148 self
._hostid
= hostid
152 raise error
.BarrierError(
153 '"port" and "listen_server" are mutually exclusive.')
154 self
._port
= listen_server
.port
156 self
._port
= port
or _DEFAULT_PORT
157 self
._server
= listen_server
# A listen_server instance or None.
158 self
._members
= [] # List of hosts we expect to find at the barrier.
159 self
._timeout
_secs
= timeout
160 self
._start
_time
= None # Timestamp of when we started waiting.
161 self
._masterid
= None # Host/IP + optional tag of selected master.
162 logging
.info("tag=%s port=%d timeout=%r",
163 self
._tag
, self
._port
, self
._timeout
_secs
)
165 # Number of clients seen (should be the length of self._waiting).
168 # Clients who have checked in and are waiting (if we are a master).
169 self
._waiting
= {} # Maps from hostname -> (client, addr) tuples.
172 def _update_timeout(self
, timeout
):
173 if timeout
is not None and self
._start
_time
is not None:
174 self
._timeout
_secs
= (time() - self
._start
_time
) + timeout
176 self
._timeout
_secs
= timeout
179 def _remaining(self
):
180 if self
._timeout
_secs
is not None and self
._start
_time
is not None:
181 timeout
= self
._timeout
_secs
- (time() - self
._start
_time
)
183 errmsg
= "timeout waiting for barrier: %s" % self
._tag
185 raise error
.BarrierError(errmsg
)
187 timeout
= self
._timeout
_secs
189 if self
._timeout
_secs
is not None:
190 logging
.info("seconds remaining: %d", timeout
)
194 def _master_welcome(self
, connection
):
195 client
, addr
= connection
200 # Get the clients name.
201 intro
= client
.recv(1024)
202 intro
= intro
.strip("\r\n")
204 intro_parts
= intro
.split(' ', 2)
205 if len(intro_parts
) != 2:
206 logging
.warn("Ignoring invalid data from %s: %r",
207 client
.getpeername(), intro
)
210 tag
, name
= intro_parts
212 logging
.info("new client tag=%s, name=%s", tag
, name
)
214 # Ok, we know who is trying to attach. Confirm that
215 # they are coming to the same meeting. Also, everyone
216 # should be using a unique handle (their IP address).
217 # If we see a duplicate, something _bad_ has happened
220 logging
.warn("client arriving for the wrong barrier: %s != %s",
226 elif name
in self
._waiting
:
227 logging
.warn("duplicate client")
233 # Acknowledge the client
236 except socket
.timeout
:
237 # This is nominally an error, but as we do not know
238 # who that was we cannot do anything sane other
239 # than report it and let the normal timeout kill
240 # us when thats appropriate.
241 logging
.warn("client handshake timeout: (%s:%d)",
246 logging
.info("client now waiting: %s (%s:%d)",
247 name
, addr
[0], addr
[1])
249 # They seem to be valid record them.
250 self
._waiting
[name
] = connection
254 def _slave_hello(self
, connection
):
255 (client
, addr
) = connection
260 client
.send(self
._tag
+ " " + self
._hostid
)
262 reply
= client
.recv(4)
263 reply
= reply
.strip("\r\n")
264 logging
.info("master said: %s", reply
)
266 # Confirm the master accepted the connection.
268 logging
.warn("Bad connection request to master")
272 except socket
.timeout
:
273 # This is nominally an error, but as we do not know
274 # who that was we cannot do anything sane other
275 # than report it and let the normal timeout kill
276 # us when thats appropriate.
277 logging
.error("master handshake timeout: (%s:%d)",
282 logging
.info("slave now waiting: (%s:%d)", addr
[0], addr
[1])
284 # They seem to be valid record them.
285 self
._waiting
[self
._hostid
] = connection
289 def _master_release(self
):
290 # Check everyone is still there, that they have not
291 # crashed or disconnected in the meantime.
294 for name
in self
._waiting
:
295 (client
, addr
) = self
._waiting
[name
]
297 logging
.info("checking client present: %s", name
)
303 reply
= client
.recv(1024)
304 except socket
.timeout
:
305 logging
.warn("ping/pong timeout: %s", name
)
309 logging
.warn("Client %s requested abort", name
)
311 elif reply
!= "pong":
315 raise error
.BarrierError("master lost client")
318 logging
.info("Aborting the clients")
321 logging
.info("Releasing clients")
324 # If every ones checks in then commit the release.
325 for name
in self
._waiting
:
326 (client
, addr
) = self
._waiting
[name
]
331 except socket
.timeout
:
332 logging
.warn("release timeout: %s", name
)
336 raise BarrierAbortError("Client requested abort")
339 def _waiting_close(self
):
340 # Either way, close out all the clients. If we have
341 # not released them then they know to abort.
342 for name
in self
._waiting
:
343 (client
, addr
) = self
._waiting
[name
]
345 logging
.info("closing client: %s", name
)
353 def _run_server(self
, is_master
):
354 server
= self
._server
or listen_server(port
=self
._port
)
359 # Wait for callers welcoming each.
360 server
.socket
.settimeout(self
._remaining
())
361 connection
= server
.socket
.accept()
363 self
._master
_welcome
(connection
)
365 self
._slave
_hello
(connection
)
366 except socket
.timeout
:
367 logging
.warn("timeout waiting for remaining clients")
371 # Check if everyone is here.
372 logging
.info("master seen %d of %d",
373 self
._seen
, len(self
._members
))
374 if self
._seen
== len(self
._members
):
375 self
._master
_release
()
378 # Check if master connected.
380 logging
.info("slave connected to master")
384 self
._waiting
_close
()
385 # if we created the listening_server in the beginning of this
386 # function then close the listening socket here
391 def _run_client(self
, is_master
):
392 while self
._remaining
() is None or self
._remaining
() > 0:
394 remote
= socket
.socket(socket
.AF_INET
,
396 remote
.settimeout(30)
398 # Connect to all slaves.
399 host
= get_host_from_id(self
._members
[self
._seen
])
400 logging
.info("calling slave: %s", host
)
401 connection
= (remote
, (host
, self
._port
))
402 remote
.connect(connection
[1])
403 self
._master
_welcome
(connection
)
405 # Just connect to the master.
406 host
= get_host_from_id(self
._masterid
)
407 logging
.info("calling master")
408 connection
= (remote
, (host
, self
._port
))
409 remote
.connect(connection
[1])
410 self
._slave
_hello
(connection
)
411 except socket
.timeout
:
412 logging
.warn("timeout calling host, retry")
415 except socket
.error
, err
:
417 if (code
!= errno
.ECONNREFUSED
):
422 # Check if everyone is here.
423 logging
.info("master seen %d of %d",
424 self
._seen
, len(self
._members
))
425 if self
._seen
== len(self
._members
):
426 self
._master
_release
()
429 # Check if master connected.
431 logging
.info("slave connected to master")
435 self
._waiting
_close
()
438 def _slave_wait(self
):
439 remote
= self
._waiting
[self
._hostid
][0]
442 # All control messages are the same size to allow
443 # us to split individual messages easily.
444 remote
.settimeout(self
._remaining
())
445 reply
= remote
.recv(4)
449 reply
= reply
.strip("\r\n")
450 logging
.info("master said: %s", reply
)
454 # Ensure we have sufficient time for the
455 # ping/pong/rlse cyle to complete normally.
456 self
._update
_timeout
(10 + 10 * len(self
._members
))
463 remote
.settimeout(self
._remaining
())
466 elif reply
== "rlse" or reply
== "abrt":
467 # Ensure we have sufficient time for the
468 # ping/pong/rlse cyle to complete normally.
469 self
._update
_timeout
(10 + 10 * len(self
._members
))
471 logging
.info("was released, waiting for close")
476 raise error
.BarrierError("master abort -- barrier timeout")
478 raise error
.BarrierError("master abort -- client lost")
480 raise error
.BarrierError("master abort -- incorrect tag")
482 raise error
.BarrierError("master abort -- duplicate client")
484 raise BarrierAbortError("Client requested abort")
486 raise error
.BarrierError("master handshake failure: " + mode
)
489 def rendezvous(self
, *hosts
, **dargs
):
490 # if called with abort=True, this will raise an exception
491 # on all the clients.
492 self
._start
_time
= time()
493 self
._members
= list(hosts
)
495 self
._masterid
= self
._members
.pop(0)
496 self
._abort
= dargs
.get('abort', False)
498 logging
.info("masterid: %s", self
._masterid
)
500 logging
.debug("%s is aborting", self
._hostid
)
501 if not len(self
._members
):
502 logging
.info("No other members listed.")
504 logging
.info("members: %s", ",".join(self
._members
))
509 # Figure out who is the master in this barrier.
510 if self
._hostid
== self
._masterid
:
511 logging
.info("selected as master")
512 self
._run
_server
(is_master
=True)
514 logging
.info("selected as slave")
515 self
._run
_client
(is_master
=False)
518 def rendezvous_servers(self
, masterid
, *hosts
, **dargs
):
519 # if called with abort=True, this will raise an exception
520 # on all the clients.
521 self
._start
_time
= time()
522 self
._members
= list(hosts
)
524 self
._masterid
= masterid
525 self
._abort
= dargs
.get('abort', False)
527 logging
.info("masterid: %s", self
._masterid
)
528 if not len(self
._members
):
529 logging
.info("No other members listed.")
531 logging
.info("members: %s", ",".join(self
._members
))
536 # Figure out who is the master in this barrier.
537 if self
._hostid
== self
._masterid
:
538 logging
.info("selected as master")
539 self
._run
_client
(is_master
=True)
541 logging
.info("selected as slave")
542 self
._run
_server
(is_master
=False)