In unit-tests, always use StringIO, not io
[zeroinstall.git] / tests / testdownload.py
blobddaff4ed1be873f239b0c42cd28a90d69186fea6
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest, StringIO
4 import sys, tempfile, os
5 import unittest
6 from logging import getLogger, WARN, ERROR
7 from contextlib import contextmanager
9 sys.path.insert(0, '..')
11 os.environ["http_proxy"] = "localhost:8000"
13 from zeroinstall import helpers
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.requirements import Requirements
16 from zeroinstall.injector.driver import Driver
17 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
18 from zeroinstall.support import basedir, tasks, ro_rmtree
19 from zeroinstall.injector import fetch
20 import data
21 import my_dbus
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
31 local_hello = """<?xml version="1.0" ?>
32 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
33 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
34 </selections>"""
36 @contextmanager
37 def output_suppressed():
38 old_stdout = sys.stdout
39 old_stderr = sys.stderr
40 try:
41 sys.stdout = StringIO()
42 sys.stderr = StringIO()
43 try:
44 yield
45 except Exception:
46 raise
47 except BaseException as ex:
48 # Don't abort unit-tests if someone raises SystemExit
49 raise Exception(str(type(ex)) + " " + str(ex))
50 finally:
51 sys.stdout = old_stdout
52 sys.stderr = old_stderr
54 @contextmanager
55 def trapped_exit(expected_exit_status):
56 pid = os.getpid()
57 old_exit = os._exit
58 def my_exit(code):
59 # The background handler runs in the same process
60 # as the tests, so don't let it abort.
61 if os.getpid() == pid:
62 raise SystemExit(code)
63 # But, child download processes are OK
64 old_exit(code)
65 os._exit = my_exit
66 try:
67 try:
68 yield
69 assert False
70 except SystemExit as ex:
71 assert ex.code == expected_exit_status
72 finally:
73 os._exit = old_exit
75 class Reply:
76 def __init__(self, reply):
77 self.reply = reply
79 def readline(self):
80 return self.reply
82 def download_and_execute(driver, prog_args, main = None):
83 driver_download(driver)
84 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
86 def driver_download(driver):
87 downloaded = driver.solve_and_download_impls()
88 if downloaded:
89 tasks.wait_for_blocker(downloaded)
91 class NetworkManager:
92 def state(self):
93 return 3 # NM_STATUS_CONNECTED
95 server_process = None
96 def kill_server_process():
97 global server_process
98 if server_process is not None:
99 # The process may still be running. See
100 # http://bugs.python.org/issue14252 for why this is so
101 # complicated.
102 server_process.stdout.close()
103 if os.name != 'nt':
104 server_process.kill()
105 else:
106 try:
107 server_process.kill()
108 except WindowsError as e:
109 # This is what happens when terminate
110 # is called after the process has died.
111 if e.winerror == 5 and e.strerror == 'Access is denied':
112 assert not server_process.poll()
113 else:
114 raise
115 server_process.wait()
116 server_process = None
118 def run_server(*args):
119 global server_process
120 assert server_process is None
121 server_process = server.handle_requests(*args)
123 real_get_selections_gui = helpers.get_selections_gui
125 class TestDownload(BaseTest):
126 def setUp(self):
127 BaseTest.setUp(self)
129 self.config.handler.allow_downloads = True
130 self.config.key_info_server = 'http://localhost:3333/key-info'
132 self.config.fetcher = fetch.Fetcher(self.config)
134 stream = tempfile.TemporaryFile()
135 stream.write(data.thomas_key)
136 stream.seek(0)
137 gpg.import_key(stream)
138 stream.close()
140 trust.trust_db.watchers = []
142 helpers.get_selections_gui = raise_gui
144 global ran_gui
145 ran_gui = False
147 def tearDown(self):
148 helpers.get_selections_gui = real_get_selections_gui
149 BaseTest.tearDown(self)
150 kill_server_process()
152 def testRejectKey(self):
153 with output_suppressed():
154 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
155 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
156 assert driver.need_download()
157 sys.stdin = Reply("N\n")
158 try:
159 download_and_execute(driver, ['Hello'])
160 assert 0
161 except model.SafeException as ex:
162 if "has no usable implementations" not in str(ex):
163 raise ex
164 if "Not signed with a trusted key" not in str(self.config.handler.ex):
165 raise self.config.handler.ex
166 self.config.handler.ex = None
168 def testRejectKeyXML(self):
169 with output_suppressed():
170 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
171 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
172 assert driver.need_download()
173 sys.stdin = Reply("N\n")
174 try:
175 download_and_execute(driver, ['Hello'])
176 assert 0
177 except model.SafeException as ex:
178 if "has no usable implementations" not in str(ex):
179 raise ex
180 if "Not signed with a trusted key" not in str(self.config.handler.ex):
181 raise
182 self.config.handler.ex = None
184 def testImport(self):
185 from zeroinstall.injector import cli
187 rootLogger = getLogger()
188 rootLogger.disabled = True
189 try:
190 try:
191 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
192 assert 0
193 except model.SafeException as ex:
194 assert 'NO-SUCH-FILE' in str(ex)
195 finally:
196 rootLogger.disabled = False
197 rootLogger.setLevel(WARN)
199 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
200 self.assertEqual(None, hello)
202 with output_suppressed():
203 run_server('6FCF121BE2390E0B.gpg')
204 sys.stdin = Reply("Y\n")
206 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
207 cli.main(['--import', 'Hello'], config = self.config)
208 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
210 # Check we imported the interface after trusting the key
211 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
212 self.assertEqual(1, len(hello.implementations))
214 self.assertEqual(None, hello.local_path)
216 # Shouldn't need to prompt the second time
217 sys.stdin = None
218 cli.main(['--import', 'Hello'], config = self.config)
220 def testSelections(self):
221 from zeroinstall.injector import cli
222 with open("selections.xml", 'rb') as stream:
223 root = qdom.parse(stream)
224 sels = selections.Selections(root)
225 class Options: dry_run = False
227 with output_suppressed():
228 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
229 sys.stdin = Reply("Y\n")
230 try:
231 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
232 assert False
233 except NotStored:
234 pass
235 cli.main(['--download-only', 'selections.xml'], config = self.config)
236 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
237 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
239 assert sels.download_missing(self.config) is None
241 def testHelpers(self):
242 from zeroinstall import helpers
244 with output_suppressed():
245 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
246 sys.stdin = Reply("Y\n")
247 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
248 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
249 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
250 assert sels.download_missing(self.config) is None
252 def testSelectionsWithFeed(self):
253 from zeroinstall.injector import cli
254 with open("selections.xml", 'rb') as stream:
255 root = qdom.parse(stream)
256 sels = selections.Selections(root)
258 with output_suppressed():
259 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
260 sys.stdin = Reply("Y\n")
262 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
264 cli.main(['--download-only', 'selections.xml'], config = self.config)
265 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
266 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
268 assert sels.download_missing(self.config) is None
270 def testAcceptKey(self):
271 with output_suppressed():
272 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
273 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
274 assert driver.need_download()
275 sys.stdin = Reply("Y\n")
276 try:
277 download_and_execute(driver, ['Hello'], main = 'Missing')
278 assert 0
279 except model.SafeException as ex:
280 if "HelloWorld/Missing" not in str(ex):
281 raise
283 def testAutoAcceptKey(self):
284 self.config.auto_approve_keys = True
285 with output_suppressed():
286 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
287 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
288 assert driver.need_download()
289 sys.stdin = Reply("")
290 try:
291 download_and_execute(driver, ['Hello'], main = 'Missing')
292 assert 0
293 except model.SafeException as ex:
294 if "HelloWorld/Missing" not in str(ex):
295 raise
297 def testDistro(self):
298 with output_suppressed():
299 native_url = 'http://example.com:8000/Native.xml'
301 # Initially, we don't have the feed at all...
302 master_feed = self.config.iface_cache.get_feed(native_url)
303 assert master_feed is None, master_feed
305 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
306 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
307 driver = Driver(requirements = Requirements(native_url), config = self.config)
308 assert driver.need_download()
310 solve = driver.solve_with_downloads()
311 tasks.wait_for_blocker(solve)
312 tasks.check(solve)
314 master_feed = self.config.iface_cache.get_feed(native_url)
315 assert master_feed is not None
316 assert master_feed.implementations == {}
318 distro_feed_url = master_feed.get_distro_feed()
319 assert distro_feed_url is not None
320 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
321 assert distro_feed is not None
322 assert len(distro_feed.implementations) == 2, distro_feed.implementations
324 def testWrongSize(self):
325 with output_suppressed():
326 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
327 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
328 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
329 assert driver.need_download()
330 sys.stdin = Reply("Y\n")
331 try:
332 download_and_execute(driver, ['Hello'], main = 'Missing')
333 assert 0
334 except model.SafeException as ex:
335 if "Downloaded archive has incorrect size" not in str(ex):
336 raise ex
338 def testRecipe(self):
339 old_out = sys.stdout
340 try:
341 sys.stdout = StringIO()
342 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
343 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
344 try:
345 download_and_execute(driver, [])
346 assert False
347 except model.SafeException as ex:
348 if "HelloWorld/Missing" not in str(ex):
349 raise ex
350 finally:
351 sys.stdout = old_out
353 def testRename(self):
354 with output_suppressed():
355 run_server(('HelloWorld.tar.bz2',))
356 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
357 requirements.command = None
358 driver = Driver(requirements = requirements, config = self.config)
359 driver_download(driver)
360 digests = driver.solver.selections[requirements.interface_uri].digests
361 path = self.config.stores.lookup_any(digests)
362 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
363 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
364 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
366 def testSymlink(self):
367 old_out = sys.stdout
368 try:
369 sys.stdout = StringIO()
370 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
371 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
372 try:
373 download_and_execute(driver, [])
374 assert False
375 except model.SafeException as ex:
376 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
377 raise
378 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
379 finally:
380 sys.stdout = old_out
382 def testAutopackage(self):
383 old_out = sys.stdout
384 try:
385 sys.stdout = StringIO()
386 run_server('HelloWorld.autopackage')
387 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
388 try:
389 download_and_execute(driver, [])
390 assert False
391 except model.SafeException as ex:
392 if "HelloWorld/Missing" not in str(ex):
393 raise
394 finally:
395 sys.stdout = old_out
397 def testRecipeFailure(self):
398 old_out = sys.stdout
399 try:
400 sys.stdout = StringIO()
401 run_server('*')
402 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
403 try:
404 download_and_execute(driver, [])
405 assert False
406 except download.DownloadError as ex:
407 if "Connection" not in str(ex):
408 raise
409 finally:
410 sys.stdout = old_out
412 def testMirrors(self):
413 old_out = sys.stdout
414 try:
415 sys.stdout = StringIO()
416 getLogger().setLevel(ERROR)
417 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
418 run_server(server.Give404('/Hello.xml'),
419 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
420 '/0mirror/keys/6FCF121BE2390E0B.gpg')
421 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
422 self.config.feed_mirror = 'http://example.com:8000/0mirror'
424 refreshed = driver.solve_with_downloads()
425 tasks.wait_for_blocker(refreshed)
426 assert driver.solver.ready
427 finally:
428 sys.stdout = old_out
430 def testReplay(self):
431 old_out = sys.stdout
432 try:
433 sys.stdout = StringIO()
434 getLogger().setLevel(ERROR)
435 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
436 mtime = int(os.stat('Hello-new.xml').st_mtime)
437 with open('Hello-new.xml', 'rb') as stream:
438 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
440 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
441 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
442 self.config.feed_mirror = 'http://example.com:8000/0mirror'
444 # Update from mirror (should ignore out-of-date timestamp)
445 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
446 tasks.wait_for_blocker(refreshed)
448 # Update from upstream (should report an error)
449 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
450 try:
451 tasks.wait_for_blocker(refreshed)
452 raise Exception("Should have been rejected!")
453 except model.SafeException as ex:
454 assert "New feed's modification time is before old version" in str(ex)
456 # Must finish with the newest version
457 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
458 finally:
459 sys.stdout = old_out
461 def testBackground(self, verbose = False):
462 r = Requirements('http://example.com:8000/Hello.xml')
463 d = Driver(requirements = r, config = self.config)
464 self.import_feed(r.interface_uri, 'Hello.xml')
465 self.config.freshness = 0
466 self.config.network_use = model.network_minimal
467 d.solver.solve(r.interface_uri, arch.get_host_architecture())
468 assert d.solver.ready, d.solver.get_failure_reason()
470 @tasks.async
471 def choose_download(registed_cb, nid, actions):
472 try:
473 assert actions == ['download', 'Download'], actions
474 registed_cb(nid, 'download')
475 except:
476 import traceback
477 traceback.print_exc()
478 yield None
480 global ran_gui
481 ran_gui = False
482 os.environ['DISPLAY'] = 'dummy'
483 old_out = sys.stdout
484 try:
485 sys.stdout = StringIO()
486 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
487 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
488 my_dbus.user_callback = choose_download
490 with trapped_exit(1):
491 from zeroinstall.injector import config
492 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
493 config.DEFAULT_KEY_LOOKUP_SERVER = None
494 try:
495 background.spawn_background_update(d, verbose)
496 finally:
497 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
498 finally:
499 sys.stdout = old_out
500 assert ran_gui
502 def testBackgroundVerbose(self):
503 self.testBackground(verbose = True)
505 def testBackgroundApp(self):
506 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
508 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
510 global ran_gui
512 with output_suppressed():
513 # Select a version of Hello
514 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
515 r = Requirements('http://example.com:8000/Hello.xml')
516 driver = Driver(requirements = r, config = self.config)
517 tasks.wait_for_blocker(driver.solve_with_downloads())
518 assert driver.solver.ready
519 kill_server_process()
521 # Save it as an app
522 app = self.config.app_mgr.create_app('test-app', r)
523 app.set_selections(driver.solver.selections)
524 timestamp = os.path.join(app.path, 'last-checked')
525 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
526 selections_path = os.path.join(app.path, 'selections.xml')
528 def reset_timestamps():
529 global ran_gui
530 ran_gui = False
531 os.utime(timestamp, (1, 1)) # 1970
532 os.utime(selections_path, (1, 1))
533 if os.path.exists(last_check_attempt):
534 os.unlink(last_check_attempt)
536 # Download the implementation
537 sels = app.get_selections()
538 run_server('HelloWorld.tgz')
539 tasks.wait_for_blocker(app.download_selections(sels))
540 kill_server_process()
542 # Not time for a background update yet
543 self.config.freshness = 100
544 dl = app.download_selections(sels)
545 assert dl == None
546 assert not ran_gui
548 # Trigger a background update - no updates found
549 reset_timestamps()
550 run_server('Hello.xml')
551 with trapped_exit(1):
552 dl = app.download_selections(sels)
553 assert dl == None
554 assert not ran_gui
555 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
556 self.assertEqual(1, os.stat(selections_path).st_mtime)
557 kill_server_process()
559 # Change the selections
560 sels_path = os.path.join(app.path, 'selections.xml')
561 with open(sels_path) as stream:
562 old = stream.read()
563 with open(sels_path, 'w') as stream:
564 stream.write(old.replace('Hello', 'Goodbye'))
566 # Trigger another background update - metadata changes found
567 reset_timestamps()
568 run_server('Hello.xml')
569 with trapped_exit(1):
570 dl = app.download_selections(sels)
571 assert dl == None
572 assert not ran_gui
573 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
574 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
575 kill_server_process()
577 # Trigger another background update - GUI needed now
579 # Delete cached implementation so we need to download it again
580 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
581 assert os.path.basename(stored).startswith('sha1')
582 ro_rmtree(stored)
584 # Replace with a valid local feed so we don't have to download immediately
585 with open(sels_path, 'w') as stream:
586 stream.write(local_hello)
587 sels = app.get_selections()
589 os.environ['DISPLAY'] = 'dummy'
590 reset_timestamps()
591 run_server('Hello.xml')
592 with trapped_exit(1):
593 dl = app.download_selections(sels)
594 assert dl == None
595 assert ran_gui # (so doesn't actually update)
596 kill_server_process()
598 # Now again with no DISPLAY
599 reset_timestamps()
600 del os.environ['DISPLAY']
601 run_server('Hello.xml', 'HelloWorld.tgz')
602 with trapped_exit(1):
603 dl = app.download_selections(sels)
604 assert dl == None
605 assert not ran_gui # (so doesn't actually update)
607 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
608 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
609 kill_server_process()
611 sels = app.get_selections()
612 sel, = sels.selections.values()
613 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
615 # Untrust the key
616 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
618 os.environ['DISPLAY'] = 'dummy'
619 reset_timestamps()
620 run_server('Hello.xml')
621 with trapped_exit(1):
622 #import logging; logging.getLogger().setLevel(logging.INFO)
623 dl = app.download_selections(sels)
624 assert dl == None
625 assert ran_gui
626 kill_server_process()
628 # Update not triggered because of last-check-attempt
629 ran_gui = False
630 os.utime(timestamp, (1, 1)) # 1970
631 os.utime(selections_path, (1, 1))
632 dl = app.download_selections(sels)
633 assert dl == None
634 assert not ran_gui
636 def testAbort(self):
637 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
638 dl.abort()
639 assert dl._aborted.happened
640 assert dl.tempfile is None
642 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
643 path = dl.tempfile.name
644 dl.abort()
645 assert not os.path.exists(path)
646 assert dl._aborted.happened
647 assert dl.tempfile is None
649 if __name__ == '__main__':
650 try:
651 unittest.main()
652 finally:
653 kill_server_process()