1 # -*- test-case-name: buildbot.test.test_control -*-
5 from twisted
.trial
import unittest
6 from twisted
.internet
import defer
8 from buildbot
import master
, interfaces
9 from buildbot
.sourcestamp
import SourceStamp
10 from buildbot
.twcompat
import providedBy
11 from buildbot
.slave
import bot
12 from buildbot
.status
.builder
import SUCCESS
13 from buildbot
.process
import base
14 from buildbot
.test
.runutils
import rmtree
17 from buildbot.process import factory
18 from buildbot.steps import dummy
20 def s(klass, **kwargs):
21 return (klass, kwargs)
23 f1 = factory.BuildFactory([
24 s(dummy.Dummy, timeout=1),
27 c['bots'] = [['bot1', 'sekrit']]
30 c['builders'] = [{'name': 'force', 'slavename': 'bot1',
31 'builddir': 'force-dir', 'factory': f1}]
38 def getSlaveCommandVersion(self
, command
, oldversion
=None):
42 class Force(unittest
.TestCase
):
50 self
.rmtree("control_basedir")
51 os
.mkdir("control_basedir")
52 self
.master
= master
.BuildMaster("control_basedir")
53 self
.slavebase
= os
.path
.abspath("control_slavebase")
54 self
.rmtree(self
.slavebase
)
55 os
.mkdir("control_slavebase")
57 def connectSlave(self
):
58 port
= self
.master
.slavePort
._port
.getHost().port
59 slave
= bot
.BuildSlave("localhost", port
, "bot1", "sekrit",
60 self
.slavebase
, keepalive
=0, usePTY
=1)
63 d
= self
.master
.botmaster
.waitUntilBuilderAttached("force")
69 dl
.append(self
.master
.botmaster
.waitUntilBuilderDetached("force"))
70 dl
.append(defer
.maybeDeferred(self
.slave
.stopService
))
72 dl
.append(defer
.maybeDeferred(self
.master
.stopService
))
73 return defer
.DeferredList(dl
)
75 def testRequest(self
):
79 d
= self
.connectSlave()
80 d
.addCallback(self
._testRequest
_1)
82 def _testRequest_1(self
, res
):
83 c
= interfaces
.IControl(self
.master
)
84 req
= base
.BuildRequest("I was bored", SourceStamp())
85 builder_control
= c
.getBuilder("force")
87 req
.subscribe(d
.callback
)
88 builder_control
.requestBuild(req
)
89 d
.addCallback(self
._testRequest
_2)
90 # we use the same check-the-results code as testForce
93 def _testRequest_2(self
, build_control
):
94 self
.failUnless(providedBy(build_control
, interfaces
.IBuildControl
))
95 d
= build_control
.getStatus().waitUntilFinished()
96 d
.addCallback(self
._testRequest
_3)
99 def _testRequest_3(self
, bs
):
100 self
.failUnless(providedBy(bs
, interfaces
.IBuildStatus
))
101 self
.failUnless(bs
.isFinished())
102 self
.failUnlessEqual(bs
.getResults(), SUCCESS
)
103 #self.failUnlessEqual(bs.getResponsibleUsers(), ["bob"]) # TODO
104 self
.failUnlessEqual(bs
.getChanges(), [])
105 #self.failUnlessEqual(bs.getReason(), "forced") # TODO