3 from basetest
import BaseTest
6 sys
.path
.insert(0, '..')
8 from zeroinstall
.injector
import scheduler
9 from zeroinstall
.support
import tasks
11 real_spawn
= scheduler
._spawn
_thread
14 def __init__(self
, url
, downloads
):
15 self
._aborted
= tasks
.Blocker("abort " + url
)
17 self
.downloads
= downloads
19 def _finish(self
, status
):
20 self
.downloads
[self
.url
] = status
22 class TestSchedular(BaseTest
):
27 BaseTest
.tearDown(self
)
28 scheduler
._spawn
_thread
= real_spawn
30 def testQueuing(self
):
31 #import logging; logging.getLogger().setLevel(logging.DEBUG)
33 site
= scheduler
.Site()
40 def dummy_spawn_thread(step
):
41 resume
= tasks
.Blocker('complete ' + step
.url
)
42 downloads
[step
.url
] = resume
46 except Exception as ex
:
51 scheduler
._spawn
_thread
= dummy_spawn_thread
52 scheduler
.MAX_DOWNLOADS_PER_SITE
= 2
55 dl
= DummyDownload("http://step/" + str(i
), downloads
)
57 s
= scheduler
.DownloadStep()
60 steps
.append(site
.download(s
))
64 # Let the first two downloads start
65 for x
in range(10): yield
66 self
.assertEqual(2, len(downloads
))
68 # Let one of them complete
69 url
, blocker
= downloads
.items()[0]
71 for x
in range(10): yield
72 self
.assertEqual(3, len(downloads
))
74 # Check it was successful
75 self
.assertEqual("ok", downloads
[url
])
78 # Let the next one fail
79 url
, blocker
= downloads
.items()[0]
80 blocker
.trigger(exception
= (Exception("test"), None))
81 for x
in range(10): yield
82 self
.assertEqual(3, len(downloads
))
85 self
.assertEqual("fail", downloads
[url
])
88 # The last two should both be in progress now.
89 # Allow them both to finish.
90 blockers
= downloads
.values()
93 for x
in range(10): yield
94 results
= downloads
.values()
95 self
.assertEqual(["ok", "ok"], results
)
97 tasks
.wait_for_blocker(collect())
99 if __name__
== '__main__':