add max_builds= to BuildSlave, thanks to Dustin Mitchell. Closes #48.
[buildbot.git] / buildbot / scripts / tryclient.py
blob11f8e66d31241c44caeb539fdcd86f6178f47218
1 # -*- test-case-name: buildbot.test.test_scheduler,buildbot.test.test_vc -*-
3 import sys, os, re, time, random
4 from twisted.internet import utils, protocol, defer, reactor, task
5 from twisted.spread import pb
6 from twisted.cred import credentials
7 from twisted.python import log
8 from twisted.python.procutils import which
10 from buildbot.sourcestamp import SourceStamp
11 from buildbot.scripts import runner
12 from buildbot.util import now
13 from buildbot.status import builder
15 class SourceStampExtractor:
17 def __init__(self, treetop, branch):
18 self.treetop = treetop
19 self.branch = branch
20 self.exe = which(self.vcexe)[0]
22 def dovc(self, cmd):
23 """This accepts the arguments of a command, without the actual
24 command itself."""
25 env = os.environ.copy()
26 env['LC_ALL'] = "C"
27 d = utils.getProcessOutputAndValue(self.exe, cmd, env=env,
28 path=self.treetop)
29 d.addCallback(self._didvc, cmd)
30 return d
31 def _didvc(self, res, cmd):
32 (stdout, stderr, code) = res
33 # 'bzr diff' sets rc=1 if there were any differences. tla, baz, and
34 # cvs do something similar, so don't bother requring rc=0.
35 return stdout
37 def get(self):
38 """Return a Deferred that fires with a SourceStamp instance."""
39 d = self.getBaseRevision()
40 d.addCallback(self.getPatch)
41 d.addCallback(self.done)
42 return d
43 def readPatch(self, res, patchlevel):
44 self.patch = (patchlevel, res)
45 def done(self, res):
46 # TODO: figure out the branch too
47 ss = SourceStamp(self.branch, self.baserev, self.patch)
48 return ss
50 class CVSExtractor(SourceStampExtractor):
51 patchlevel = 0
52 vcexe = "cvs"
53 def getBaseRevision(self):
54 # this depends upon our local clock and the repository's clock being
55 # reasonably synchronized with each other. We express everything in
56 # UTC because the '%z' format specifier for strftime doesn't always
57 # work.
58 self.baserev = time.strftime("%Y-%m-%d %H:%M:%S +0000",
59 time.gmtime(now()))
60 return defer.succeed(None)
62 def getPatch(self, res):
63 # the -q tells CVS to not announce each directory as it works
64 if self.branch is not None:
65 # 'cvs diff' won't take both -r and -D at the same time (it
66 # ignores the -r). As best I can tell, there is no way to make
67 # cvs give you a diff relative to a timestamp on the non-trunk
68 # branch. A bare 'cvs diff' will tell you about the changes
69 # relative to your checked-out versions, but I know of no way to
70 # find out what those checked-out versions are.
71 raise RuntimeError("Sorry, CVS 'try' builds don't work with "
72 "branches")
73 args = ['-q', 'diff', '-u', '-D', self.baserev]
74 d = self.dovc(args)
75 d.addCallback(self.readPatch, self.patchlevel)
76 return d
78 class SVNExtractor(SourceStampExtractor):
79 patchlevel = 0
80 vcexe = "svn"
82 def getBaseRevision(self):
83 d = self.dovc(["status", "-u"])
84 d.addCallback(self.parseStatus)
85 return d
86 def parseStatus(self, res):
87 # svn shows the base revision for each file that has been modified or
88 # which needs an update. You can update each file to a different
89 # version, so each file is displayed with its individual base
90 # revision. It also shows the repository-wide latest revision number
91 # on the last line ("Status against revision: \d+").
93 # for our purposes, we use the latest revision number as the "base"
94 # revision, and get a diff against that. This means we will get
95 # reverse-diffs for local files that need updating, but the resulting
96 # tree will still be correct. The only weirdness is that the baserev
97 # that we emit may be different than the version of the tree that we
98 # first checked out.
100 # to do this differently would probably involve scanning the revision
101 # numbers to find the max (or perhaps the min) revision, and then
102 # using that as a base.
104 for line in res.split("\n"):
105 m = re.search(r'^Status against revision:\s+(\d+)', line)
106 if m:
107 self.baserev = int(m.group(1))
108 return
109 raise IndexError("Could not find 'Status against revision' in "
110 "SVN output: %s" % res)
111 def getPatch(self, res):
112 d = self.dovc(["diff", "-r%d" % self.baserev])
113 d.addCallback(self.readPatch, self.patchlevel)
114 return d
116 class BazExtractor(SourceStampExtractor):
117 patchlevel = 1
118 vcexe = "baz"
119 def getBaseRevision(self):
120 d = self.dovc(["tree-id"])
121 d.addCallback(self.parseStatus)
122 return d
123 def parseStatus(self, res):
124 tid = res.strip()
125 slash = tid.index("/")
126 dd = tid.rindex("--")
127 self.branch = tid[slash+1:dd]
128 self.baserev = tid[dd+2:]
129 def getPatch(self, res):
130 d = self.dovc(["diff"])
131 d.addCallback(self.readPatch, self.patchlevel)
132 return d
134 class TlaExtractor(SourceStampExtractor):
135 patchlevel = 1
136 vcexe = "tla"
137 def getBaseRevision(self):
138 # 'tla logs --full' gives us ARCHIVE/BRANCH--REVISION
139 # 'tla logs' gives us REVISION
140 d = self.dovc(["logs", "--full", "--reverse"])
141 d.addCallback(self.parseStatus)
142 return d
143 def parseStatus(self, res):
144 tid = res.split("\n")[0].strip()
145 slash = tid.index("/")
146 dd = tid.rindex("--")
147 self.branch = tid[slash+1:dd]
148 self.baserev = tid[dd+2:]
150 def getPatch(self, res):
151 d = self.dovc(["changes", "--diffs"])
152 d.addCallback(self.readPatch, self.patchlevel)
153 return d
155 class BzrExtractor(SourceStampExtractor):
156 patchlevel = 0
157 vcexe = "bzr"
158 def getBaseRevision(self):
159 d = self.dovc(["version-info"])
160 d.addCallback(self.get_revision_number)
161 return d
162 def get_revision_number(self, out):
163 for line in out.split("\n"):
164 colon = line.find(":")
165 if colon != -1:
166 key, value = line[:colon], line[colon+2:]
167 if key == "revno":
168 self.baserev = int(value)
169 return
170 raise ValueError("unable to find revno: in bzr output: '%s'" % out)
172 def getPatch(self, res):
173 d = self.dovc(["diff"])
174 d.addCallback(self.readPatch, self.patchlevel)
175 return d
177 class MercurialExtractor(SourceStampExtractor):
178 patchlevel = 1
179 vcexe = "hg"
180 def getBaseRevision(self):
181 d = self.dovc(["identify"])
182 d.addCallback(self.parseStatus)
183 return d
184 def parseStatus(self, output):
185 m = re.search(r'^(\w+)', output)
186 self.baserev = m.group(0)
187 def getPatch(self, res):
188 d = self.dovc(["diff"])
189 d.addCallback(self.readPatch, self.patchlevel)
190 return d
192 class DarcsExtractor(SourceStampExtractor):
193 patchlevel = 1
194 vcexe = "darcs"
195 def getBaseRevision(self):
196 d = self.dovc(["changes", "--context"])
197 d.addCallback(self.parseStatus)
198 return d
199 def parseStatus(self, res):
200 self.baserev = res # the whole context file
201 def getPatch(self, res):
202 d = self.dovc(["diff", "-u"])
203 d.addCallback(self.readPatch, self.patchlevel)
204 return d
206 def getSourceStamp(vctype, treetop, branch=None):
207 if vctype == "cvs":
208 e = CVSExtractor(treetop, branch)
209 elif vctype == "svn":
210 e = SVNExtractor(treetop, branch)
211 elif vctype == "baz":
212 e = BazExtractor(treetop, branch)
213 elif vctype == "bzr":
214 e = BzrExtractor(treetop, branch)
215 elif vctype == "tla":
216 e = TlaExtractor(treetop, branch)
217 elif vctype == "hg":
218 e = MercurialExtractor(treetop, branch)
219 elif vctype == "darcs":
220 e = DarcsExtractor(treetop, branch)
221 else:
222 raise KeyError("unknown vctype '%s'" % vctype)
223 return e.get()
226 def ns(s):
227 return "%d:%s," % (len(s), s)
229 def createJobfile(bsid, branch, baserev, patchlevel, diff, builderNames):
230 job = ""
231 job += ns("1")
232 job += ns(bsid)
233 job += ns(branch)
234 job += ns(str(baserev))
235 job += ns("%d" % patchlevel)
236 job += ns(diff)
237 for bn in builderNames:
238 job += ns(bn)
239 return job
241 def getTopdir(topfile, start=None):
242 """walk upwards from the current directory until we find this topfile"""
243 if not start:
244 start = os.getcwd()
245 here = start
246 toomany = 20
247 while toomany > 0:
248 if os.path.exists(os.path.join(here, topfile)):
249 return here
250 next = os.path.dirname(here)
251 if next == here:
252 break # we've hit the root
253 here = next
254 toomany -= 1
255 raise ValueError("Unable to find topfile '%s' anywhere from %s upwards"
256 % (topfile, start))
258 class RemoteTryPP(protocol.ProcessProtocol):
259 def __init__(self, job):
260 self.job = job
261 self.d = defer.Deferred()
262 def connectionMade(self):
263 self.transport.write(self.job)
264 self.transport.closeStdin()
265 def outReceived(self, data):
266 sys.stdout.write(data)
267 def errReceived(self, data):
268 sys.stderr.write(data)
269 def processEnded(self, status_object):
270 sig = status_object.value.signal
271 rc = status_object.value.exitCode
272 if sig != None or rc != 0:
273 self.d.errback(RuntimeError("remote 'buildbot tryserver' failed"
274 ": sig=%s, rc=%s" % (sig, rc)))
275 return
276 self.d.callback((sig, rc))
278 class BuildSetStatusGrabber:
279 retryCount = 5 # how many times to we try to grab the BuildSetStatus?
280 retryDelay = 3 # seconds to wait between attempts
282 def __init__(self, status, bsid):
283 self.status = status
284 self.bsid = bsid
286 def grab(self):
287 # return a Deferred that either fires with the BuildSetStatus
288 # reference or errbacks because we were unable to grab it
289 self.d = defer.Deferred()
290 # wait a second before querying to give the master's maildir watcher
291 # a chance to see the job
292 reactor.callLater(1, self.go)
293 return self.d
295 def go(self, dummy=None):
296 if self.retryCount == 0:
297 raise RuntimeError("couldn't find matching buildset")
298 self.retryCount -= 1
299 d = self.status.callRemote("getBuildSets")
300 d.addCallback(self._gotSets)
302 def _gotSets(self, buildsets):
303 for bs,bsid in buildsets:
304 if bsid == self.bsid:
305 # got it
306 self.d.callback(bs)
307 return
308 d = defer.Deferred()
309 d.addCallback(self.go)
310 reactor.callLater(self.retryDelay, d.callback, None)
313 class Try(pb.Referenceable):
314 buildsetStatus = None
315 quiet = False
317 def __init__(self, config):
318 self.config = config
319 self.opts = runner.loadOptions()
320 self.connect = self.getopt('connect', 'try_connect')
321 assert self.connect, "you must specify a connect style: ssh or pb"
322 self.builderNames = self.getopt('builders', 'try_builders')
323 assert self.builderNames, "no builders! use --builder or " \
324 "try_builders=[names..] in .buildbot/options"
326 def getopt(self, config_name, options_name, default=None):
327 value = self.config.get(config_name)
328 if value is None or value == []:
329 value = self.opts.get(options_name)
330 if value is None or value == []:
331 value = default
332 return value
334 def createJob(self):
335 # returns a Deferred which fires when the job parameters have been
336 # created
337 opts = self.opts
338 # generate a random (unique) string. It would make sense to add a
339 # hostname and process ID here, but a) I suspect that would cause
340 # windows portability problems, and b) really this is good enough
341 self.bsid = "%d-%s" % (time.time(), random.randint(0, 1000000))
343 # common options
344 branch = self.getopt("branch", "try_branch")
346 difffile = self.config.get("diff")
347 if difffile:
348 baserev = self.config.get("baserev")
349 if difffile == "-":
350 diff = sys.stdin.read()
351 else:
352 diff = open(difffile,"r").read()
353 patch = (self.config['patchlevel'], diff)
354 ss = SourceStamp(branch, baserev, patch)
355 d = defer.succeed(ss)
356 else:
357 vc = self.getopt("vc", "try_vc")
358 if vc in ("cvs", "svn"):
359 # we need to find the tree-top
360 topdir = self.getopt("try_topdir", "try_topdir")
361 if topdir:
362 treedir = os.path.expanduser(topdir)
363 else:
364 topfile = self.getopt("try-topfile", "try_topfile")
365 treedir = getTopdir(topfile)
366 else:
367 treedir = os.getcwd()
368 d = getSourceStamp(vc, treedir, branch)
369 d.addCallback(self._createJob_1)
370 return d
372 def _createJob_1(self, ss):
373 self.sourcestamp = ss
374 if self.connect == "ssh":
375 patchlevel, diff = ss.patch
376 revspec = ss.revision
377 if revspec is None:
378 revspec = ""
379 self.jobfile = createJobfile(self.bsid,
380 ss.branch or "", revspec,
381 patchlevel, diff,
382 self.builderNames)
384 def deliverJob(self):
385 # returns a Deferred that fires when the job has been delivered
386 opts = self.opts
388 if self.connect == "ssh":
389 tryhost = self.getopt("tryhost", "try_host")
390 tryuser = self.getopt("username", "try_username")
391 trydir = self.getopt("trydir", "try_dir")
393 argv = ["ssh", "-l", tryuser, tryhost,
394 "buildbot", "tryserver", "--jobdir", trydir]
395 # now run this command and feed the contents of 'job' into stdin
397 pp = RemoteTryPP(self.jobfile)
398 p = reactor.spawnProcess(pp, argv[0], argv, os.environ)
399 d = pp.d
400 return d
401 if self.connect == "pb":
402 user = self.getopt("username", "try_username")
403 passwd = self.getopt("passwd", "try_password")
404 master = self.getopt("master", "try_master")
405 tryhost, tryport = master.split(":")
406 tryport = int(tryport)
407 f = pb.PBClientFactory()
408 d = f.login(credentials.UsernamePassword(user, passwd))
409 reactor.connectTCP(tryhost, tryport, f)
410 d.addCallback(self._deliverJob_pb)
411 return d
412 raise RuntimeError("unknown connecttype '%s', should be 'ssh' or 'pb'"
413 % self.connect)
415 def _deliverJob_pb(self, remote):
416 ss = self.sourcestamp
417 d = remote.callRemote("try",
418 ss.branch, ss.revision, ss.patch,
419 self.builderNames)
420 d.addCallback(self._deliverJob_pb2)
421 return d
422 def _deliverJob_pb2(self, status):
423 self.buildsetStatus = status
424 return status
426 def getStatus(self):
427 # returns a Deferred that fires when the builds have finished, and
428 # may emit status messages while we wait
429 wait = bool(self.getopt("wait", "try_wait", False))
430 if not wait:
431 # TODO: emit the URL where they can follow the builds. This
432 # requires contacting the Status server over PB and doing
433 # getURLForThing() on the BuildSetStatus. To get URLs for
434 # individual builds would require we wait for the builds to
435 # start.
436 print "not waiting for builds to finish"
437 return
438 d = self.running = defer.Deferred()
439 if self.buildsetStatus:
440 self._getStatus_1()
441 # contact the status port
442 # we're probably using the ssh style
443 master = self.getopt("master", "masterstatus")
444 host, port = master.split(":")
445 port = int(port)
446 self.announce("contacting the status port at %s:%d" % (host, port))
447 f = pb.PBClientFactory()
448 creds = credentials.UsernamePassword("statusClient", "clientpw")
449 d = f.login(creds)
450 reactor.connectTCP(host, port, f)
451 d.addCallback(self._getStatus_ssh_1)
452 return self.running
454 def _getStatus_ssh_1(self, remote):
455 # find a remotereference to the corresponding BuildSetStatus object
456 self.announce("waiting for job to be accepted")
457 g = BuildSetStatusGrabber(remote, self.bsid)
458 d = g.grab()
459 d.addCallback(self._getStatus_1)
460 return d
462 def _getStatus_1(self, res=None):
463 if res:
464 self.buildsetStatus = res
465 # gather the set of BuildRequests
466 d = self.buildsetStatus.callRemote("getBuildRequests")
467 d.addCallback(self._getStatus_2)
469 def _getStatus_2(self, brs):
470 self.builderNames = []
471 self.buildRequests = {}
473 # self.builds holds the current BuildStatus object for each one
474 self.builds = {}
476 # self.outstanding holds the list of builderNames which haven't
477 # finished yet
478 self.outstanding = []
480 # self.results holds the list of build results. It holds a tuple of
481 # (result, text)
482 self.results = {}
484 # self.currentStep holds the name of the Step that each build is
485 # currently running
486 self.currentStep = {}
488 # self.ETA holds the expected finishing time (absolute time since
489 # epoch)
490 self.ETA = {}
492 for n,br in brs:
493 self.builderNames.append(n)
494 self.buildRequests[n] = br
495 self.builds[n] = None
496 self.outstanding.append(n)
497 self.results[n] = [None,None]
498 self.currentStep[n] = None
499 self.ETA[n] = None
500 # get new Builds for this buildrequest. We follow each one until
501 # it finishes or is interrupted.
502 br.callRemote("subscribe", self)
504 # now that those queries are in transit, we can start the
505 # display-status-every-30-seconds loop
506 self.printloop = task.LoopingCall(self.printStatus)
507 self.printloop.start(3, now=False)
510 # these methods are invoked by the status objects we've subscribed to
512 def remote_newbuild(self, bs, builderName):
513 if self.builds[builderName]:
514 self.builds[builderName].callRemote("unsubscribe", self)
515 self.builds[builderName] = bs
516 bs.callRemote("subscribe", self, 20)
517 d = bs.callRemote("waitUntilFinished")
518 d.addCallback(self._build_finished, builderName)
520 def remote_stepStarted(self, buildername, build, stepname, step):
521 self.currentStep[buildername] = stepname
523 def remote_stepFinished(self, buildername, build, stepname, step, results):
524 pass
526 def remote_buildETAUpdate(self, buildername, build, eta):
527 self.ETA[buildername] = now() + eta
529 def _build_finished(self, bs, builderName):
530 # we need to collect status from the newly-finished build. We don't
531 # remove the build from self.outstanding until we've collected
532 # everything we want.
533 self.builds[builderName] = None
534 self.ETA[builderName] = None
535 self.currentStep[builderName] = "finished"
536 d = bs.callRemote("getResults")
537 d.addCallback(self._build_finished_2, bs, builderName)
538 return d
539 def _build_finished_2(self, results, bs, builderName):
540 self.results[builderName][0] = results
541 d = bs.callRemote("getText")
542 d.addCallback(self._build_finished_3, builderName)
543 return d
544 def _build_finished_3(self, text, builderName):
545 self.results[builderName][1] = text
547 self.outstanding.remove(builderName)
548 if not self.outstanding:
549 # all done
550 return self.statusDone()
552 def printStatus(self):
553 names = self.buildRequests.keys()
554 names.sort()
555 for n in names:
556 if n not in self.outstanding:
557 # the build is finished, and we have results
558 code,text = self.results[n]
559 t = builder.Results[code]
560 if text:
561 t += " (%s)" % " ".join(text)
562 elif self.builds[n]:
563 t = self.currentStep[n] or "building"
564 if self.ETA[n]:
565 t += " [ETA %ds]" % (self.ETA[n] - now())
566 else:
567 t = "no build"
568 self.announce("%s: %s" % (n, t))
569 self.announce("")
571 def statusDone(self):
572 self.printloop.stop()
573 print "All Builds Complete"
574 # TODO: include a URL for all failing builds
575 names = self.buildRequests.keys()
576 names.sort()
577 happy = True
578 for n in names:
579 code,text = self.results[n]
580 t = "%s: %s" % (n, builder.Results[code])
581 if text:
582 t += " (%s)" % " ".join(text)
583 print t
584 if self.results[n] != builder.SUCCESS:
585 happy = False
587 if happy:
588 self.exitcode = 0
589 else:
590 self.exitcode = 1
591 self.running.callback(self.exitcode)
593 def announce(self, message):
594 if not self.quiet:
595 print message
597 def run(self):
598 # we can't do spawnProcess until we're inside reactor.run(), so get
599 # funky
600 print "using '%s' connect method" % self.connect
601 self.exitcode = 0
602 d = defer.Deferred()
603 d.addCallback(lambda res: self.createJob())
604 d.addCallback(lambda res: self.announce("job created"))
605 d.addCallback(lambda res: self.deliverJob())
606 d.addCallback(lambda res: self.announce("job has been delivered"))
607 d.addCallback(lambda res: self.getStatus())
608 d.addErrback(log.err)
609 d.addCallback(self.cleanup)
610 d.addCallback(lambda res: reactor.stop())
612 reactor.callLater(0, d.callback, None)
613 reactor.run()
614 sys.exit(self.exitcode)
616 def logErr(self, why):
617 log.err(why)
618 print "error during 'try' processing"
619 print why
621 def cleanup(self, res=None):
622 if self.buildsetStatus:
623 self.buildsetStatus.broker.transport.loseConnection()