All unit-tests now pass with Python 3
[zeroinstall/solver.git] / tests / testdownload.py
blob87a8d89c6c59c436ee7d21ea5e07579edd9ee46d
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import warnings
5 import sys, tempfile, os
6 if sys.version_info[0] > 2:
7 from io import StringIO
8 else:
9 from StringIO import StringIO
10 import unittest, signal
11 from logging import getLogger, WARN, ERROR
12 from contextlib import contextmanager
14 sys.path.insert(0, '..')
16 os.environ["http_proxy"] = "localhost:8000"
18 from zeroinstall import helpers
19 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
20 from zeroinstall.injector.requirements import Requirements
21 from zeroinstall.injector.driver import Driver
22 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
23 from zeroinstall.support import basedir, tasks, ro_rmtree
24 from zeroinstall.injector import fetch
25 import data
26 import my_dbus
28 import server
30 ran_gui = False
31 def raise_gui(*args):
32 global ran_gui
33 ran_gui = True
34 background._detach = lambda: False
36 local_hello = """<?xml version="1.0" ?>
37 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
38 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
39 </selections>"""
41 @contextmanager
42 def output_suppressed():
43 old_stdout = sys.stdout
44 old_stderr = sys.stderr
45 try:
46 sys.stdout = StringIO()
47 sys.stderr = StringIO()
48 try:
49 yield
50 except Exception:
51 raise
52 except BaseException as ex:
53 # Don't abort unit-tests if someone raises SystemExit
54 raise Exception(str(type(ex)) + " " + str(ex))
55 finally:
56 sys.stdout = old_stdout
57 sys.stderr = old_stderr
59 @contextmanager
60 def trapped_exit(expected_exit_status):
61 pid = os.getpid()
62 old_exit = os._exit
63 def my_exit(code):
64 # The background handler runs in the same process
65 # as the tests, so don't let it abort.
66 if os.getpid() == pid:
67 raise SystemExit(code)
68 # But, child download processes are OK
69 old_exit(code)
70 os._exit = my_exit
71 try:
72 try:
73 yield
74 assert False
75 except SystemExit as ex:
76 assert ex.code == expected_exit_status
77 finally:
78 os._exit = old_exit
80 class Reply:
81 def __init__(self, reply):
82 self.reply = reply
84 def readline(self):
85 return self.reply
87 def download_and_execute(driver, prog_args, main = None):
88 driver_download(driver)
89 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
91 def driver_download(driver):
92 downloaded = driver.solve_and_download_impls()
93 if downloaded:
94 tasks.wait_for_blocker(downloaded)
96 class NetworkManager:
97 def state(self):
98 return 3 # NM_STATUS_CONNECTED
100 server_process = None
101 def kill_server_process():
102 global server_process
103 if server_process is not None:
104 # The process may still be running. See
105 # http://bugs.python.org/issue14252 for why this is so
106 # complicated.
107 server_process.stdout.close()
108 if os.name != 'nt':
109 server_process.kill()
110 else:
111 try:
112 server_process.kill()
113 except WindowsError as e:
114 # This is what happens when terminate
115 # is called after the process has died.
116 if e.winerror == 5 and e.strerror == 'Access is denied':
117 assert not server_process.poll()
118 else:
119 raise
120 server_process.wait()
121 server_process = None
123 def run_server(*args):
124 global server_process
125 assert server_process is None
126 server_process = server.handle_requests(*args)
128 real_get_selections_gui = helpers.get_selections_gui
130 class TestDownload(BaseTest):
131 def setUp(self):
132 BaseTest.setUp(self)
134 self.config.handler.allow_downloads = True
135 self.config.key_info_server = 'http://localhost:3333/key-info'
137 self.config.fetcher = fetch.Fetcher(self.config)
139 stream = tempfile.TemporaryFile()
140 stream.write(data.thomas_key)
141 stream.seek(0)
142 gpg.import_key(stream)
143 stream.close()
145 trust.trust_db.watchers = []
147 helpers.get_selections_gui = raise_gui
149 global ran_gui
150 ran_gui = False
152 def tearDown(self):
153 helpers.get_selections_gui = real_get_selections_gui
154 BaseTest.tearDown(self)
155 kill_server_process()
157 def testRejectKey(self):
158 with output_suppressed():
159 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
160 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
161 assert driver.need_download()
162 sys.stdin = Reply("N\n")
163 try:
164 download_and_execute(driver, ['Hello'])
165 assert 0
166 except model.SafeException as ex:
167 if "has no usable implementations" not in str(ex):
168 raise ex
169 if "Not signed with a trusted key" not in str(self.config.handler.ex):
170 raise self.config.handler.ex
171 self.config.handler.ex = None
173 def testRejectKeyXML(self):
174 with output_suppressed():
175 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
176 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
177 assert driver.need_download()
178 sys.stdin = Reply("N\n")
179 try:
180 download_and_execute(driver, ['Hello'])
181 assert 0
182 except model.SafeException as ex:
183 if "has no usable implementations" not in str(ex):
184 raise ex
185 if "Not signed with a trusted key" not in str(self.config.handler.ex):
186 raise
187 self.config.handler.ex = None
189 def testImport(self):
190 from zeroinstall.injector import cli
192 rootLogger = getLogger()
193 rootLogger.disabled = True
194 try:
195 try:
196 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
197 assert 0
198 except model.SafeException as ex:
199 assert 'NO-SUCH-FILE' in str(ex)
200 finally:
201 rootLogger.disabled = False
202 rootLogger.setLevel(WARN)
204 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
205 self.assertEqual(None, hello)
207 with output_suppressed():
208 run_server('6FCF121BE2390E0B.gpg')
209 sys.stdin = Reply("Y\n")
211 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
212 cli.main(['--import', 'Hello'], config = self.config)
213 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
215 # Check we imported the interface after trusting the key
216 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
217 self.assertEqual(1, len(hello.implementations))
219 self.assertEqual(None, hello.local_path)
221 # Shouldn't need to prompt the second time
222 sys.stdin = None
223 cli.main(['--import', 'Hello'], config = self.config)
225 def testSelections(self):
226 from zeroinstall.injector import cli
227 with open("selections.xml", 'rb') as stream:
228 root = qdom.parse(stream)
229 sels = selections.Selections(root)
230 class Options: dry_run = False
232 with output_suppressed():
233 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
234 sys.stdin = Reply("Y\n")
235 try:
236 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
237 assert False
238 except NotStored:
239 pass
240 cli.main(['--download-only', 'selections.xml'], config = self.config)
241 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
242 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
244 assert sels.download_missing(self.config) is None
246 def testHelpers(self):
247 from zeroinstall import helpers
249 with output_suppressed():
250 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
251 sys.stdin = Reply("Y\n")
252 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
253 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
254 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
255 assert sels.download_missing(self.config) is None
257 def testSelectionsWithFeed(self):
258 from zeroinstall.injector import cli
259 with open("selections.xml", 'rb') as stream:
260 root = qdom.parse(stream)
261 sels = selections.Selections(root)
263 with output_suppressed():
264 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
265 sys.stdin = Reply("Y\n")
267 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
269 cli.main(['--download-only', 'selections.xml'], config = self.config)
270 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
271 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
273 assert sels.download_missing(self.config) is None
275 def testAcceptKey(self):
276 with output_suppressed():
277 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
278 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
279 assert driver.need_download()
280 sys.stdin = Reply("Y\n")
281 try:
282 download_and_execute(driver, ['Hello'], main = 'Missing')
283 assert 0
284 except model.SafeException as ex:
285 if "HelloWorld/Missing" not in str(ex):
286 raise
288 def testAutoAcceptKey(self):
289 self.config.auto_approve_keys = True
290 with output_suppressed():
291 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
292 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
293 assert driver.need_download()
294 sys.stdin = Reply("")
295 try:
296 download_and_execute(driver, ['Hello'], main = 'Missing')
297 assert 0
298 except model.SafeException as ex:
299 if "HelloWorld/Missing" not in str(ex):
300 raise
302 def testDistro(self):
303 with output_suppressed():
304 native_url = 'http://example.com:8000/Native.xml'
306 # Initially, we don't have the feed at all...
307 master_feed = self.config.iface_cache.get_feed(native_url)
308 assert master_feed is None, master_feed
310 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
311 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
312 driver = Driver(requirements = Requirements(native_url), config = self.config)
313 assert driver.need_download()
315 solve = driver.solve_with_downloads()
316 tasks.wait_for_blocker(solve)
317 tasks.check(solve)
319 master_feed = self.config.iface_cache.get_feed(native_url)
320 assert master_feed is not None
321 assert master_feed.implementations == {}
323 distro_feed_url = master_feed.get_distro_feed()
324 assert distro_feed_url is not None
325 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
326 assert distro_feed is not None
327 assert len(distro_feed.implementations) == 2, distro_feed.implementations
329 def testWrongSize(self):
330 with output_suppressed():
331 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
332 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
333 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
334 assert driver.need_download()
335 sys.stdin = Reply("Y\n")
336 try:
337 download_and_execute(driver, ['Hello'], main = 'Missing')
338 assert 0
339 except model.SafeException as ex:
340 if "Downloaded archive has incorrect size" not in str(ex):
341 raise ex
343 def testRecipe(self):
344 old_out = sys.stdout
345 try:
346 sys.stdout = StringIO()
347 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
348 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
349 try:
350 download_and_execute(driver, [])
351 assert False
352 except model.SafeException as ex:
353 if "HelloWorld/Missing" not in str(ex):
354 raise ex
355 finally:
356 sys.stdout = old_out
358 def testRename(self):
359 with output_suppressed():
360 run_server(('HelloWorld.tar.bz2',))
361 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
362 requirements.command = None
363 driver = Driver(requirements = requirements, config = self.config)
364 driver_download(driver)
365 digests = driver.solver.selections[requirements.interface_uri].digests
366 path = self.config.stores.lookup_any(digests)
367 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
368 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
369 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
371 def testSymlink(self):
372 old_out = sys.stdout
373 try:
374 sys.stdout = StringIO()
375 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
376 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
377 try:
378 download_and_execute(driver, [])
379 assert False
380 except model.SafeException as ex:
381 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
382 raise
383 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
384 finally:
385 sys.stdout = old_out
387 def testAutopackage(self):
388 old_out = sys.stdout
389 try:
390 sys.stdout = StringIO()
391 run_server('HelloWorld.autopackage')
392 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
393 try:
394 download_and_execute(driver, [])
395 assert False
396 except model.SafeException as ex:
397 if "HelloWorld/Missing" not in str(ex):
398 raise
399 finally:
400 sys.stdout = old_out
402 def testRecipeFailure(self):
403 old_out = sys.stdout
404 try:
405 sys.stdout = StringIO()
406 run_server('*')
407 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
408 try:
409 download_and_execute(driver, [])
410 assert False
411 except download.DownloadError as ex:
412 if "Connection" not in str(ex):
413 raise
414 finally:
415 sys.stdout = old_out
417 def testMirrors(self):
418 old_out = sys.stdout
419 try:
420 sys.stdout = StringIO()
421 getLogger().setLevel(ERROR)
422 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
423 run_server(server.Give404('/Hello.xml'),
424 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
425 '/0mirror/keys/6FCF121BE2390E0B.gpg')
426 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
427 self.config.feed_mirror = 'http://example.com:8000/0mirror'
429 refreshed = driver.solve_with_downloads()
430 tasks.wait_for_blocker(refreshed)
431 assert driver.solver.ready
432 finally:
433 sys.stdout = old_out
435 def testReplay(self):
436 old_out = sys.stdout
437 try:
438 sys.stdout = StringIO()
439 getLogger().setLevel(ERROR)
440 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
441 mtime = int(os.stat('Hello-new.xml').st_mtime)
442 with open('Hello-new.xml', 'rb') as stream:
443 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
445 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
446 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
447 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
448 self.config.feed_mirror = 'http://example.com:8000/0mirror'
450 # Update from mirror (should ignore out-of-date timestamp)
451 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
452 tasks.wait_for_blocker(refreshed)
454 # Update from upstream (should report an error)
455 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
456 try:
457 tasks.wait_for_blocker(refreshed)
458 raise Exception("Should have been rejected!")
459 except model.SafeException as ex:
460 assert "New feed's modification time is before old version" in str(ex)
462 # Must finish with the newest version
463 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
464 finally:
465 sys.stdout = old_out
467 def testBackground(self, verbose = False):
468 r = Requirements('http://example.com:8000/Hello.xml')
469 d = Driver(requirements = r, config = self.config)
470 self.import_feed(r.interface_uri, 'Hello.xml')
471 self.config.freshness = 0
472 self.config.network_use = model.network_minimal
473 d.solver.solve(r.interface_uri, arch.get_host_architecture())
474 assert d.solver.ready, d.solver.get_failure_reason()
476 @tasks.async
477 def choose_download(registed_cb, nid, actions):
478 try:
479 assert actions == ['download', 'Download'], actions
480 registed_cb(nid, 'download')
481 except:
482 import traceback
483 traceback.print_exc()
484 yield None
486 global ran_gui
487 ran_gui = False
488 os.environ['DISPLAY'] = 'dummy'
489 old_out = sys.stdout
490 try:
491 sys.stdout = StringIO()
492 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
493 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
494 my_dbus.user_callback = choose_download
496 with trapped_exit(1):
497 from zeroinstall.injector import config
498 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
499 config.DEFAULT_KEY_LOOKUP_SERVER = None
500 try:
501 background.spawn_background_update(d, verbose)
502 finally:
503 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
504 finally:
505 sys.stdout = old_out
506 assert ran_gui
508 def testBackgroundVerbose(self):
509 self.testBackground(verbose = True)
511 def testBackgroundApp(self):
512 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
514 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
516 global ran_gui
518 with output_suppressed():
519 # Select a version of Hello
520 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
521 r = Requirements('http://example.com:8000/Hello.xml')
522 driver = Driver(requirements = r, config = self.config)
523 tasks.wait_for_blocker(driver.solve_with_downloads())
524 assert driver.solver.ready
525 kill_server_process()
527 # Save it as an app
528 app = self.config.app_mgr.create_app('test-app', r)
529 app.set_selections(driver.solver.selections)
530 timestamp = os.path.join(app.path, 'last-checked')
531 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
532 selections_path = os.path.join(app.path, 'selections.xml')
534 def reset_timestamps():
535 ran_gui = False
536 os.utime(timestamp, (1, 1)) # 1970
537 os.utime(selections_path, (1, 1))
538 if os.path.exists(last_check_attempt):
539 os.unlink(last_check_attempt)
541 # Download the implementation
542 sels = app.get_selections()
543 run_server('HelloWorld.tgz')
544 tasks.wait_for_blocker(app.download_selections(sels))
545 kill_server_process()
547 # Not time for a background update yet
548 self.config.freshness = 100
549 dl = app.download_selections(sels)
550 assert dl == None
551 assert not ran_gui
553 # Trigger a background update - no updates found
554 reset_timestamps()
555 run_server('Hello.xml')
556 with trapped_exit(1):
557 dl = app.download_selections(sels)
558 assert dl == None
559 assert not ran_gui
560 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
561 self.assertEqual(1, os.stat(selections_path).st_mtime)
562 kill_server_process()
564 # Change the selections
565 sels_path = os.path.join(app.path, 'selections.xml')
566 with open(sels_path) as stream:
567 old = stream.read()
568 with open(sels_path, 'w') as stream:
569 stream.write(old.replace('Hello', 'Goodbye'))
571 # Trigger another background update - metadata changes found
572 reset_timestamps()
573 run_server('Hello.xml')
574 with trapped_exit(1):
575 dl = app.download_selections(sels)
576 assert dl == None
577 assert not ran_gui
578 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
579 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
580 kill_server_process()
582 # Trigger another background update - GUI needed now
584 # Delete cached implementation so we need to download it again
585 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
586 assert os.path.basename(stored).startswith('sha1')
587 ro_rmtree(stored)
589 # Replace with a valid local feed so we don't have to download immediately
590 with open(sels_path, 'w') as stream:
591 stream.write(local_hello)
592 sels = app.get_selections()
594 os.environ['DISPLAY'] = 'dummy'
595 reset_timestamps()
596 run_server('Hello.xml')
597 with trapped_exit(1):
598 dl = app.download_selections(sels)
599 assert dl == None
600 assert ran_gui # (so doesn't actually update)
601 kill_server_process()
603 # Now again with no DISPLAY
604 reset_timestamps()
605 del os.environ['DISPLAY']
606 run_server('Hello.xml', 'HelloWorld.tgz')
607 with trapped_exit(1):
608 dl = app.download_selections(sels)
609 assert dl == None
610 assert ran_gui # (so doesn't actually update)
612 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
613 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
614 kill_server_process()
616 sels = app.get_selections()
617 sel, = sels.selections.values()
618 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
620 # Untrust the key
621 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
623 os.environ['DISPLAY'] = 'dummy'
624 reset_timestamps()
625 run_server('Hello.xml')
626 with trapped_exit(1):
627 #import logging; logging.getLogger().setLevel(logging.INFO)
628 dl = app.download_selections(sels)
629 assert dl == None
630 assert ran_gui
631 kill_server_process()
633 # Update not triggered because of last-check-attempt
634 ran_gui = False
635 os.utime(timestamp, (1, 1)) # 1970
636 os.utime(selections_path, (1, 1))
637 dl = app.download_selections(sels)
638 assert dl == None
639 assert not ran_gui
641 def testAbort(self):
642 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
643 dl.abort()
644 assert dl._aborted.happened
645 assert dl.tempfile is None
647 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
648 path = dl.tempfile.name
649 dl.abort()
650 assert not os.path.exists(path)
651 assert dl._aborted.happened
652 assert dl.tempfile is None
654 if __name__ == '__main__':
655 try:
656 unittest.main()
657 finally:
658 kill_server_process()