1 # -*- test-case-name: buildbot.test.test_control -*-
5 from twisted
.trial
import unittest
6 from twisted
.internet
import defer
8 from buildbot
import master
, interfaces
9 from buildbot
.sourcestamp
import SourceStamp
10 from buildbot
.twcompat
import providedBy
, maybeWait
11 from buildbot
.slave
import bot
12 from buildbot
.status
.builder
import SUCCESS
13 from buildbot
.process
import base
14 from buildbot
.test
.runutils
import rmtree
17 from buildbot.process import factory
18 from buildbot.steps import dummy
20 def s(klass, **kwargs):
21 return (klass, kwargs)
23 f1 = factory.BuildFactory([
24 s(dummy.Dummy, timeout=1),
27 c['bots'] = [['bot1', 'sekrit']]
30 c['builders'] = [{'name': 'force', 'slavename': 'bot1',
31 'builddir': 'force-dir', 'factory': f1}]
38 def getSlaveCommandVersion(self
, command
, oldversion
=None):
42 class Force(unittest
.TestCase
):
50 self
.rmtree("control_basedir")
51 os
.mkdir("control_basedir")
52 self
.master
= master
.BuildMaster("control_basedir")
53 self
.slavebase
= os
.path
.abspath("control_slavebase")
54 self
.rmtree(self
.slavebase
)
55 os
.mkdir("control_slavebase")
57 def connectSlave(self
):
58 port
= self
.master
.slavePort
._port
.getHost().port
59 slave
= bot
.BuildSlave("localhost", port
, "bot1", "sekrit",
60 self
.slavebase
, keepalive
=0, usePTY
=1)
63 d
= self
.master
.botmaster
.waitUntilBuilderAttached("force")
69 dl
.append(self
.master
.botmaster
.waitUntilBuilderDetached("force"))
70 dl
.append(defer
.maybeDeferred(self
.slave
.stopService
))
72 dl
.append(defer
.maybeDeferred(self
.master
.stopService
))
73 return maybeWait(defer
.DeferredList(dl
))
76 # TODO: since BuilderControl.forceBuild has been deprecated, this
77 # test is scheduled to be removed soon
81 d
= self
.connectSlave()
82 d
.addCallback(self
._testForce
_1)
85 def _testForce_1(self
, res
):
86 c
= interfaces
.IControl(self
.master
)
87 builder_control
= c
.getBuilder("force")
88 d
= builder_control
.forceBuild("bob", "I was bored")
89 d
.addCallback(self
._testForce
_2)
92 def _testForce_2(self
, build_control
):
93 self
.failUnless(providedBy(build_control
, interfaces
.IBuildControl
))
94 d
= build_control
.getStatus().waitUntilFinished()
95 d
.addCallback(self
._testForce
_3)
98 def _testForce_3(self
, bs
):
99 self
.failUnless(providedBy(bs
, interfaces
.IBuildStatus
))
100 self
.failUnless(bs
.isFinished())
101 self
.failUnlessEqual(bs
.getResults(), SUCCESS
)
102 #self.failUnlessEqual(bs.getResponsibleUsers(), ["bob"]) # TODO
103 self
.failUnlessEqual(bs
.getChanges(), [])
104 #self.failUnlessEqual(bs.getReason(), "forced") # TODO
106 def testRequest(self
):
110 d
= self
.connectSlave()
111 d
.addCallback(self
._testRequest
_1)
113 def _testRequest_1(self
, res
):
114 c
= interfaces
.IControl(self
.master
)
115 req
= base
.BuildRequest("I was bored", SourceStamp())
116 builder_control
= c
.getBuilder("force")
118 req
.subscribe(d
.callback
)
119 builder_control
.requestBuild(req
)
120 d
.addCallback(self
._testForce
_2)
121 # we use the same check-the-results code as testForce