[Buildbot-commits] buildbot/buildbot/test test_web.py,1.46,1.47
Brian Warner
warner at users.sourceforge.net
Wed Aug 1 22:08:40 UTC 2007
Update of /cvsroot/buildbot/buildbot/buildbot/test
In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv29260/buildbot/test
Modified Files:
test_web.py
Log Message:
[project @ web-refactoring: fix tests]
Original author: warner at lothar.com
Date: 2007-07-31 02:37:33+00:00
Index: test_web.py
===================================================================
RCS file: /cvsroot/buildbot/buildbot/buildbot/test/test_web.py,v
retrieving revision 1.46
retrieving revision 1.47
diff -u -d -r1.46 -r1.47
--- test_web.py 1 Aug 2007 20:31:27 -0000 1.46
+++ test_web.py 1 Aug 2007 22:08:37 -0000 1.47
@@ -117,69 +117,82 @@
d = self.master.stopService()
return d
+ def find_webstatus(self, master):
+ for child in list(master):
+ if isinstance(child, html.WebStatus):
+ return child
+
def find_waterfall(self, master):
- return filter(lambda child: isinstance(child, html.Waterfall),
- list(master))
+ for child in list(master):
+ if isinstance(child, html.Waterfall):
+ return child
class Ports(BaseWeb, unittest.TestCase):
def test_webPortnum(self):
# run a regular web server on a TCP socket
- config = base_config + "c['status'] = [html.Waterfall(http_port=0)]\n"
+ config = base_config + "c['status'] = [html.WebStatus(http_port=0)]\n"
os.mkdir("test_web1")
self.master = m = ConfiguredMaster("test_web1", config)
m.startService()
# hack to find out what randomly-assigned port it is listening on
- port = list(self.find_waterfall(m)[0])[0]._port.getHost().port
+ port = self.find_webstatus(m).getPortnum()
- d = client.getPage("http://localhost:%d/" % port)
- d.addCallback(self._test_webPortnum_1)
+ d = client.getPage("http://localhost:%d/waterfall" % port)
+ def _check(page):
+ #print page
+ self.failUnless(page)
+ d.addCallback(_check)
return d
test_webPortnum.timeout = 10
- def _test_webPortnum_1(self, page):
- #print page
- self.failUnless(page)
def test_webPathname(self):
# running a t.web.distrib server over a UNIX socket
if not IReactorUNIX.providedBy(reactor):
raise unittest.SkipTest("UNIX sockets not supported here")
config = (base_config +
- "c['status'] = [html.Waterfall(distrib_port='.web-pb')]\n")
+ "c['status'] = [html.WebStatus(distrib_port='.web-pb')]\n")
os.mkdir("test_web2")
self.master = m = ConfiguredMaster("test_web2", config)
m.startService()
p = DistribUNIX("test_web2/.web-pb")
- d = client.getPage("http://localhost:%d/remote/" % p.portnum)
- d.addCallback(self._test_webPathname_1, p)
+ d = client.getPage("http://localhost:%d/remote/waterfall" % p.portnum)
+ def _check(page):
+ self.failUnless(page)
+ d.addCallback(_check)
+ def _done(res):
+ d1 = p.shutdown()
+ d1.addCallback(lambda x: res)
+ return d1
+ d.addBoth(_done)
return d
test_webPathname.timeout = 10
- def _test_webPathname_1(self, page, p):
- #print page
- self.failUnless(page)
- return p.shutdown()
def test_webPathname_port(self):
# running a t.web.distrib server over TCP
config = (base_config +
- "c['status'] = [html.Waterfall(distrib_port=0)]\n")
+ "c['status'] = [html.WebStatus(distrib_port=0)]\n")
os.mkdir("test_web3")
self.master = m = ConfiguredMaster("test_web3", config)
m.startService()
- dport = list(self.find_waterfall(m)[0])[0]._port.getHost().port
+ dport = self.find_webstatus(m).getPortnum()
p = DistribTCP(dport)
- d = client.getPage("http://localhost:%d/remote/" % p.portnum)
- d.addCallback(self._test_webPathname_port_1, p)
+ d = client.getPage("http://localhost:%d/remote/waterfall" % p.portnum)
+ def _check(page):
+ self.failUnlessIn("BuildBot", page)
+ d.addCallback(_check)
+ def _done(res):
+ d1 = p.shutdown()
+ d1.addCallback(lambda x: res)
+ return d1
+ d.addBoth(_done)
return d
test_webPathname_port.timeout = 10
- def _test_webPathname_port_1(self, page, p):
- self.failUnlessIn("BuildBot", page)
- return p.shutdown()
class Waterfall(BaseWeb, unittest.TestCase):
@@ -201,50 +214,45 @@
self.master = m = ConfiguredMaster("test_web4", config1)
m.startService()
- # hack to find out what randomly-assigned port it is listening on
- port = list(self.find_waterfall(m)[0])[0]._port.getHost().port
+ port = self.find_waterfall(m).getPortnum()
self.port = port
# insert an event
m.change_svc.addChange(Change("user", ["foo.c"], "comments"))
d = client.getPage("http://localhost:%d/" % port)
- d.addCallback(self._test_waterfall_1)
- return d
- test_waterfall.timeout = 10
- def _test_waterfall_1(self, page):
- self.failUnless(page)
- self.failUnlessIn("current activity", page)
- self.failUnlessIn("<html", page)
- TZ = time.tzname[time.daylight]
- self.failUnlessIn("time (%s)" % TZ, page)
- # phase=0 is really for debugging the waterfall layout
- d = client.getPage("http://localhost:%d/?phase=0" % self.port)
- d.addCallback(self._test_waterfall_2)
- return d
- def _test_waterfall_2(self, page):
- self.failUnless(page)
- self.failUnlessIn("<html", page)
+ def _check1(page):
+ self.failUnless(page)
+ self.failUnlessIn("current activity", page)
+ self.failUnlessIn("<html", page)
+ TZ = time.tzname[time.daylight]
+ self.failUnlessIn("time (%s)" % TZ, page)
- d = client.getPage("http://localhost:%d/favicon.ico" % self.port)
- d.addCallback(self._test_waterfall_3)
- return d
- def _test_waterfall_3(self, icon):
- expected = open(waterfall.buildbot_icon,"rb").read()
- self.failUnless(icon == expected)
+ # phase=0 is really for debugging the waterfall layout
+ return client.getPage("http://localhost:%d/?phase=0" % self.port)
+ d.addCallback(_check1)
- d = client.getPage("http://localhost:%d/changes" % self.port)
- d.addCallback(self._test_waterfall_4)
- return d
- def _test_waterfall_4(self, changes):
- self.failUnlessIn("<li>Syncmail mailing list in maildir " +
- "my-maildir</li>", changes)
+ def _check2(page):
+ self.failUnless(page)
+ self.failUnlessIn("<html", page)
+
+ return client.getPage("http://localhost:%d/changes" % self.port)
+ d.addCallback(_check2)
+
+ def _check3(changes):
+ self.failUnlessIn("<li>Syncmail mailing list in maildir " +
+ "my-maildir</li>", changes)
+
+ return client.getPage("http://localhost:%d/robots.txt" % self.port)
+ d.addCallback(_check3)
+
+ def _check4(robotstxt):
+ self.failUnless(robotstxt == self.robots_txt_contents)
+ d.addCallback(_check4)
- d = client.getPage("http://localhost:%d/robots.txt" % self.port)
- d.addCallback(self._test_waterfall_5)
return d
- def _test_waterfall_5(self, robotstxt):
- self.failUnless(robotstxt == self.robots_txt_contents)
+
+ test_waterfall.timeout = 10
class WaterfallSteps(unittest.TestCase):
@@ -393,7 +401,7 @@
'builders': [{'name': 'builder1', 'slavename': 'bot1',
'builddir':'workdir', 'factory':f1}],
'slavePortnum': 0,
- 'status': [html.Waterfall(http_port=0)],
+ 'status': [html.WebStatus(http_port=0)],
}
"""
if os.path.exists("test_logfile"):
@@ -402,7 +410,7 @@
self.master = m = ConfiguredMaster("test_logfile", config)
m.startService()
# hack to find out what randomly-assigned port it is listening on
- port = list(self.find_waterfall(m)[0])[0]._port.getHost().port
+ port = self.find_webstatus(m).getPortnum()
self.port = port
# insert an event
@@ -437,42 +445,44 @@
step1.step_status.stepFinished(builder.SUCCESS)
bs.buildFinished()
- def getLogURL(self, stepname, lognum):
- logurl = "http://localhost:%d/builder1/builds/0/step-%s/%d" \
- % (self.port, stepname, lognum)
- return logurl
+ def getLogPath(self, stepname, logname):
+ return ("/builders/builder1/builds/0/steps/%s/logs/%s" %
+ (stepname, logname))
+
+ def getLogURL(self, stepname, logname):
+ return ("http://localhost:%d" % self.port
+ + self.getLogPath(stepname, logname))
def test_logfile1(self):
d = client.getPage("http://localhost:%d/" % self.port)
- d.addCallback(self._test_logfile1_1)
+ def _check(page):
+ self.failUnless(page)
+ d.addCallback(_check)
return d
- test_logfile1.timeout = 20
- def _test_logfile1_1(self, page):
- self.failUnless(page)
def test_logfile2(self):
- logurl = self.getLogURL("setup", 0)
+ logurl = self.getLogURL("setup", "output")
d = client.getPage(logurl)
- d.addCallback(self._test_logfile2_1)
+ def _check(logbody):
+ self.failUnless(logbody)
+ d.addCallback(_check)
return d
- def _test_logfile2_1(self, logbody):
- self.failUnless(logbody)
def test_logfile3(self):
- logurl = self.getLogURL("setup", 0)
+ logurl = self.getLogURL("setup", "output")
d = client.getPage(logurl + "/text")
- d.addCallback(self._test_logfile3_1)
+ def _check(logtext):
+ self.failUnlessEqual(logtext, "some stdout\n")
+ d.addCallback(_check)
return d
- def _test_logfile3_1(self, logtext):
- self.failUnlessEqual(logtext, "some stdout\n")
def test_logfile4(self):
- logurl = self.getLogURL("setup", 1)
+ logurl = self.getLogURL("setup", "error")
d = client.getPage(logurl)
- d.addCallback(self._test_logfile4_1)
+ def _check(logbody):
+ self.failUnlessEqual(logbody, "<html>ouch</html>")
+ d.addCallback(_check)
return d
- def _test_logfile4_1(self, logbody):
- self.failUnlessEqual(logbody, "<html>ouch</html>")
def test_logfile5(self):
# this is log3, which is about 1MB in size, made up of alternating
@@ -480,33 +490,33 @@
# twisted-1.3.0, fails to resume sending chunks after the client
# stalls for a few seconds, because of a recursive doWrite() call
# that was fixed in twisted-2.0.0
- p = SlowReader("GET /builder1/builds/0/step-setup/2 HTTP/1.0\r\n\r\n")
- f = CFactory(p)
- c = reactor.connectTCP("localhost", self.port, f)
+ p = SlowReader("GET %s HTTP/1.0\r\n\r\n"
+ % self.getLogPath("setup", "big"))
+ cf = CFactory(p)
+ c = reactor.connectTCP("localhost", self.port, cf)
d = p.d
- d.addCallback(self._test_logfile5_1, p)
+ def _check(res):
+ self.failUnlessIn("big log", p.data)
+ self.failUnlessIn("a"*100, p.data)
+ self.failUnless(p.count > 1*1000*1000)
+ d.addCallback(_check)
return d
- test_logfile5.timeout = 10
- def _test_logfile5_1(self, res, p):
- self.failUnlessIn("big log", p.data)
- self.failUnlessIn("a"*100, p.data)
- self.failUnless(p.count > 1*1000*1000)
def test_logfile6(self):
# this is log4, which is about 1MB in size, one big chunk.
# buildbot-0.6.6 dies as the NetstringReceiver barfs on the
# saved logfile, because it was using one big chunk and exceeding
# NetstringReceiver.MAX_LENGTH
- p = SlowReader("GET /builder1/builds/0/step-setup/3 HTTP/1.0\r\n\r\n")
- f = CFactory(p)
- c = reactor.connectTCP("localhost", self.port, f)
+ p = SlowReader("GET %s HTTP/1.0\r\n\r\n"
+ % self.getLogPath("setup", "bigcomplete"))
+ cf = CFactory(p)
+ c = reactor.connectTCP("localhost", self.port, cf)
d = p.d
- d.addCallback(self._test_logfile6_1, p)
+ def _check(res):
+ self.failUnlessIn("big2 log", p.data)
+ self.failUnlessIn("a"*100, p.data)
+ self.failUnless(p.count > 1*1000*1000)
+ d.addCallback(_check)
return d
- test_logfile6.timeout = 10
- def _test_logfile6_1(self, res, p):
- self.failUnlessIn("big2 log", p.data)
- self.failUnlessIn("a"*100, p.data)
- self.failUnless(p.count > 1*1000*1000)
More information about the Commits
mailing list