1 # -*- test-case-name: buildbot.test.test_control -*-
5 from twisted
.trial
import unittest
6 from twisted
.internet
import defer
8 from buildbot
import master
, interfaces
9 from buildbot
.sourcestamp
import SourceStamp
10 from buildbot
.slave
import bot
11 from buildbot
.status
.builder
import SUCCESS
12 from buildbot
.process
import base
13 from buildbot
.test
.runutils
import rmtree
16 from buildbot.process import factory
17 from buildbot.steps import dummy
18 from buildbot.buildslave import BuildSlave
20 def s(klass, **kwargs):
21 return (klass, kwargs)
23 f1 = factory.BuildFactory([
24 s(dummy.Dummy, timeout=1),
27 c['slaves'] = [BuildSlave('bot1', 'sekrit')]
29 c['builders'] = [{'name': 'force', 'slavename': 'bot1',
30 'builddir': 'force-dir', 'factory': f1}]
37 def getSlaveCommandVersion(self
, command
, oldversion
=None):
41 class Force(unittest
.TestCase
):
49 self
.rmtree("control_basedir")
50 os
.mkdir("control_basedir")
51 self
.master
= master
.BuildMaster("control_basedir")
52 self
.slavebase
= os
.path
.abspath("control_slavebase")
53 self
.rmtree(self
.slavebase
)
54 os
.mkdir("control_slavebase")
56 def connectSlave(self
):
57 port
= self
.master
.slavePort
._port
.getHost().port
58 slave
= bot
.BuildSlave("localhost", port
, "bot1", "sekrit",
59 self
.slavebase
, keepalive
=0, usePTY
=1)
62 d
= self
.master
.botmaster
.waitUntilBuilderAttached("force")
68 dl
.append(self
.master
.botmaster
.waitUntilBuilderDetached("force"))
69 dl
.append(defer
.maybeDeferred(self
.slave
.stopService
))
71 dl
.append(defer
.maybeDeferred(self
.master
.stopService
))
72 return defer
.DeferredList(dl
)
74 def testRequest(self
):
78 d
= self
.connectSlave()
79 d
.addCallback(self
._testRequest
_1)
81 def _testRequest_1(self
, res
):
82 c
= interfaces
.IControl(self
.master
)
83 req
= base
.BuildRequest("I was bored", SourceStamp())
84 builder_control
= c
.getBuilder("force")
86 req
.subscribe(d
.callback
)
87 builder_control
.requestBuild(req
)
88 d
.addCallback(self
._testRequest
_2)
89 # we use the same check-the-results code as testForce
92 def _testRequest_2(self
, build_control
):
93 self
.failUnless(interfaces
.IBuildControl
.providedBy(build_control
))
94 d
= build_control
.getStatus().waitUntilFinished()
95 d
.addCallback(self
._testRequest
_3)
98 def _testRequest_3(self
, bs
):
99 self
.failUnless(interfaces
.IBuildStatus
.providedBy(bs
))
100 self
.failUnless(bs
.isFinished())
101 self
.failUnlessEqual(bs
.getResults(), SUCCESS
)
102 #self.failUnlessEqual(bs.getResponsibleUsers(), ["bob"]) # TODO
103 self
.failUnlessEqual(bs
.getChanges(), ())
104 #self.failUnlessEqual(bs.getReason(), "forced") # TODO