Added support for implementation mirrors
[zeroinstall/solver.git] / tests / testdownload.py
blob25431df5689ba359cf32796d01dca57dd18ba294
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest, StringIO
4 import sys, tempfile, os
5 import unittest
6 import logging
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall import helpers
15 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
16 from zeroinstall.injector.requirements import Requirements
17 from zeroinstall.injector.driver import Driver
18 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
19 from zeroinstall.support import basedir, tasks, ro_rmtree
20 from zeroinstall.injector import fetch
21 import data
22 import my_dbus
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
32 local_hello = """<?xml version="1.0" ?>
33 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
34 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
35 </selections>"""
37 @contextmanager
38 def output_suppressed():
39 old_stdout = sys.stdout
40 old_stderr = sys.stderr
41 try:
42 sys.stdout = StringIO()
43 sys.stderr = StringIO()
44 try:
45 yield
46 except Exception:
47 raise
48 except BaseException as ex:
49 # Don't abort unit-tests if someone raises SystemExit
50 raise Exception(str(type(ex)) + " " + str(ex))
51 finally:
52 sys.stdout = old_stdout
53 sys.stderr = old_stderr
55 @contextmanager
56 def trapped_exit(expected_exit_status):
57 pid = os.getpid()
58 old_exit = os._exit
59 def my_exit(code):
60 # The background handler runs in the same process
61 # as the tests, so don't let it abort.
62 if os.getpid() == pid:
63 raise SystemExit(code)
64 # But, child download processes are OK
65 old_exit(code)
66 os._exit = my_exit
67 try:
68 try:
69 yield
70 assert False
71 except SystemExit as ex:
72 assert ex.code == expected_exit_status
73 finally:
74 os._exit = old_exit
76 class Reply:
77 def __init__(self, reply):
78 self.reply = reply
80 def readline(self):
81 return self.reply
83 def download_and_execute(driver, prog_args, main = None):
84 driver_download(driver)
85 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
87 def driver_download(driver):
88 downloaded = driver.solve_and_download_impls()
89 if downloaded:
90 tasks.wait_for_blocker(downloaded)
92 class NetworkManager:
93 def state(self):
94 return 3 # NM_STATUS_CONNECTED
96 server_process = None
97 def kill_server_process():
98 global server_process
99 if server_process is not None:
100 # The process may still be running. See
101 # http://bugs.python.org/issue14252 for why this is so
102 # complicated.
103 server_process.stdout.close()
104 if os.name != 'nt':
105 server_process.kill()
106 else:
107 try:
108 server_process.kill()
109 except WindowsError as e:
110 # This is what happens when terminate
111 # is called after the process has died.
112 if e.winerror == 5 and e.strerror == 'Access is denied':
113 assert not server_process.poll()
114 else:
115 raise
116 server_process.wait()
117 server_process = None
119 def run_server(*args):
120 global server_process
121 assert server_process is None
122 server_process = server.handle_requests(*args)
124 real_get_selections_gui = helpers.get_selections_gui
126 class TestDownload(BaseTest):
127 def setUp(self):
128 BaseTest.setUp(self)
130 self.config.handler.allow_downloads = True
131 self.config.key_info_server = 'http://localhost:3333/key-info'
133 self.config.fetcher = fetch.Fetcher(self.config)
135 stream = tempfile.TemporaryFile()
136 stream.write(data.thomas_key)
137 stream.seek(0)
138 gpg.import_key(stream)
139 stream.close()
141 trust.trust_db.watchers = []
143 helpers.get_selections_gui = raise_gui
145 global ran_gui
146 ran_gui = False
148 def tearDown(self):
149 helpers.get_selections_gui = real_get_selections_gui
150 BaseTest.tearDown(self)
151 kill_server_process()
153 def testRejectKey(self):
154 with output_suppressed():
155 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
157 assert driver.need_download()
158 sys.stdin = Reply("N\n")
159 try:
160 download_and_execute(driver, ['Hello'])
161 assert 0
162 except model.SafeException as ex:
163 if "has no usable implementations" not in str(ex):
164 raise ex
165 if "Not signed with a trusted key" not in str(self.config.handler.ex):
166 raise self.config.handler.ex
167 self.config.handler.ex = None
169 def testRejectKeyXML(self):
170 with output_suppressed():
171 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
172 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
173 assert driver.need_download()
174 sys.stdin = Reply("N\n")
175 try:
176 download_and_execute(driver, ['Hello'])
177 assert 0
178 except model.SafeException as ex:
179 if "has no usable implementations" not in str(ex):
180 raise ex
181 if "Not signed with a trusted key" not in str(self.config.handler.ex):
182 raise
183 self.config.handler.ex = None
185 def testImport(self):
186 from zeroinstall.injector import cli
188 rootLogger = getLogger()
189 rootLogger.disabled = True
190 try:
191 try:
192 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
193 assert 0
194 except model.SafeException as ex:
195 assert 'NO-SUCH-FILE' in str(ex)
196 finally:
197 rootLogger.disabled = False
198 rootLogger.setLevel(WARN)
200 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
201 self.assertEqual(None, hello)
203 with output_suppressed():
204 run_server('6FCF121BE2390E0B.gpg')
205 sys.stdin = Reply("Y\n")
207 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
208 cli.main(['--import', 'Hello'], config = self.config)
209 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
211 # Check we imported the interface after trusting the key
212 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
213 self.assertEqual(1, len(hello.implementations))
215 self.assertEqual(None, hello.local_path)
217 # Shouldn't need to prompt the second time
218 sys.stdin = None
219 cli.main(['--import', 'Hello'], config = self.config)
221 def testSelections(self):
222 from zeroinstall.injector import cli
223 with open("selections.xml", 'rb') as stream:
224 root = qdom.parse(stream)
225 sels = selections.Selections(root)
226 class Options: dry_run = False
228 with output_suppressed():
229 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
230 sys.stdin = Reply("Y\n")
231 try:
232 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
233 assert False
234 except NotStored:
235 pass
236 cli.main(['--download-only', 'selections.xml'], config = self.config)
237 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
238 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
240 assert sels.download_missing(self.config) is None
242 def testHelpers(self):
243 from zeroinstall import helpers
245 with output_suppressed():
246 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
247 sys.stdin = Reply("Y\n")
248 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
249 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
250 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
251 assert sels.download_missing(self.config) is None
253 def testSelectionsWithFeed(self):
254 from zeroinstall.injector import cli
255 with open("selections.xml", 'rb') as stream:
256 root = qdom.parse(stream)
257 sels = selections.Selections(root)
259 with output_suppressed():
260 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
261 sys.stdin = Reply("Y\n")
263 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
265 cli.main(['--download-only', 'selections.xml'], config = self.config)
266 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
267 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
269 assert sels.download_missing(self.config) is None
271 def testAcceptKey(self):
272 with output_suppressed():
273 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
274 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
275 assert driver.need_download()
276 sys.stdin = Reply("Y\n")
277 try:
278 download_and_execute(driver, ['Hello'], main = 'Missing')
279 assert 0
280 except model.SafeException as ex:
281 if "HelloWorld/Missing" not in str(ex):
282 raise
284 def testAutoAcceptKey(self):
285 self.config.auto_approve_keys = True
286 with output_suppressed():
287 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
288 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
289 assert driver.need_download()
290 sys.stdin = Reply("")
291 try:
292 download_and_execute(driver, ['Hello'], main = 'Missing')
293 assert 0
294 except model.SafeException as ex:
295 if "HelloWorld/Missing" not in str(ex):
296 raise
298 def testDistro(self):
299 with output_suppressed():
300 native_url = 'http://example.com:8000/Native.xml'
302 # Initially, we don't have the feed at all...
303 master_feed = self.config.iface_cache.get_feed(native_url)
304 assert master_feed is None, master_feed
306 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
307 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
308 driver = Driver(requirements = Requirements(native_url), config = self.config)
309 assert driver.need_download()
311 solve = driver.solve_with_downloads()
312 tasks.wait_for_blocker(solve)
313 tasks.check(solve)
315 master_feed = self.config.iface_cache.get_feed(native_url)
316 assert master_feed is not None
317 assert master_feed.implementations == {}
319 distro_feed_url = master_feed.get_distro_feed()
320 assert distro_feed_url is not None
321 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
322 assert distro_feed is not None
323 assert len(distro_feed.implementations) == 2, distro_feed.implementations
325 def testWrongSize(self):
326 with output_suppressed():
327 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
328 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
329 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
330 assert driver.need_download()
331 sys.stdin = Reply("Y\n")
332 try:
333 download_and_execute(driver, ['Hello'], main = 'Missing')
334 assert 0
335 except model.SafeException as ex:
336 if "Downloaded archive has incorrect size" not in str(ex):
337 raise ex
339 def testRecipe(self):
340 old_out = sys.stdout
341 try:
342 sys.stdout = StringIO()
343 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
344 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
345 try:
346 download_and_execute(driver, [])
347 assert False
348 except model.SafeException as ex:
349 if "HelloWorld/Missing" not in str(ex):
350 raise ex
351 finally:
352 sys.stdout = old_out
354 def testRename(self):
355 with output_suppressed():
356 run_server(('HelloWorld.tar.bz2',))
357 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
358 requirements.command = None
359 driver = Driver(requirements = requirements, config = self.config)
360 driver_download(driver)
361 digests = driver.solver.selections[requirements.interface_uri].digests
362 path = self.config.stores.lookup_any(digests)
363 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
364 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
365 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
367 def testSymlink(self):
368 old_out = sys.stdout
369 try:
370 sys.stdout = StringIO()
371 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
372 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
373 try:
374 download_and_execute(driver, [])
375 assert False
376 except model.SafeException as ex:
377 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
378 raise
379 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
380 finally:
381 sys.stdout = old_out
383 def testAutopackage(self):
384 old_out = sys.stdout
385 try:
386 sys.stdout = StringIO()
387 run_server('HelloWorld.autopackage')
388 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
389 try:
390 download_and_execute(driver, [])
391 assert False
392 except model.SafeException as ex:
393 if "HelloWorld/Missing" not in str(ex):
394 raise
395 finally:
396 sys.stdout = old_out
398 def testRecipeFailure(self):
399 old_out = sys.stdout
400 try:
401 sys.stdout = StringIO()
402 run_server('*')
403 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
404 try:
405 download_and_execute(driver, [])
406 assert False
407 except download.DownloadError as ex:
408 if "Connection" not in str(ex):
409 raise
410 finally:
411 sys.stdout = old_out
413 def testMirrors(self):
414 getLogger().setLevel(logging.ERROR)
415 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
416 run_server(server.Give404('/Hello.xml'),
417 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
418 '/0mirror/keys/6FCF121BE2390E0B.gpg',
419 server.Give404('/HelloWorld.tgz'),
420 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
421 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
422 self.config.mirror = 'http://example.com:8000/0mirror'
424 refreshed = driver.solve_with_downloads()
425 tasks.wait_for_blocker(refreshed)
426 assert driver.solver.ready
428 #getLogger().setLevel(logging.WARN)
429 downloaded = driver.download_uncached_implementations()
430 tasks.wait_for_blocker(downloaded)
431 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
432 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
434 def testReplay(self):
435 old_out = sys.stdout
436 try:
437 sys.stdout = StringIO()
438 getLogger().setLevel(ERROR)
439 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
440 mtime = int(os.stat('Hello-new.xml').st_mtime)
441 with open('Hello-new.xml', 'rb') as stream:
442 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
444 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
445 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
446 self.config.mirror = 'http://example.com:8000/0mirror'
448 # Update from mirror (should ignore out-of-date timestamp)
449 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
450 tasks.wait_for_blocker(refreshed)
452 # Update from upstream (should report an error)
453 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
454 try:
455 tasks.wait_for_blocker(refreshed)
456 raise Exception("Should have been rejected!")
457 except model.SafeException as ex:
458 assert "New feed's modification time is before old version" in str(ex)
460 # Must finish with the newest version
461 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
462 finally:
463 sys.stdout = old_out
465 def testBackground(self, verbose = False):
466 r = Requirements('http://example.com:8000/Hello.xml')
467 d = Driver(requirements = r, config = self.config)
468 self.import_feed(r.interface_uri, 'Hello.xml')
469 self.config.freshness = 0
470 self.config.network_use = model.network_minimal
471 d.solver.solve(r.interface_uri, arch.get_host_architecture())
472 assert d.solver.ready, d.solver.get_failure_reason()
474 @tasks.async
475 def choose_download(registed_cb, nid, actions):
476 try:
477 assert actions == ['download', 'Download'], actions
478 registed_cb(nid, 'download')
479 except:
480 import traceback
481 traceback.print_exc()
482 yield None
484 global ran_gui
485 ran_gui = False
486 os.environ['DISPLAY'] = 'dummy'
487 old_out = sys.stdout
488 try:
489 sys.stdout = StringIO()
490 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
491 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
492 my_dbus.user_callback = choose_download
494 with trapped_exit(1):
495 from zeroinstall.injector import config
496 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
497 config.DEFAULT_KEY_LOOKUP_SERVER = None
498 try:
499 background.spawn_background_update(d, verbose)
500 finally:
501 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
502 finally:
503 sys.stdout = old_out
504 assert ran_gui
506 def testBackgroundVerbose(self):
507 self.testBackground(verbose = True)
509 def testBackgroundApp(self):
510 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
512 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
514 global ran_gui
516 with output_suppressed():
517 # Select a version of Hello
518 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
519 r = Requirements('http://example.com:8000/Hello.xml')
520 driver = Driver(requirements = r, config = self.config)
521 tasks.wait_for_blocker(driver.solve_with_downloads())
522 assert driver.solver.ready
523 kill_server_process()
525 # Save it as an app
526 app = self.config.app_mgr.create_app('test-app', r)
527 app.set_selections(driver.solver.selections)
528 timestamp = os.path.join(app.path, 'last-checked')
529 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
530 selections_path = os.path.join(app.path, 'selections.xml')
532 def reset_timestamps():
533 global ran_gui
534 ran_gui = False
535 os.utime(timestamp, (1, 1)) # 1970
536 os.utime(selections_path, (1, 1))
537 if os.path.exists(last_check_attempt):
538 os.unlink(last_check_attempt)
540 # Download the implementation
541 sels = app.get_selections()
542 run_server('HelloWorld.tgz')
543 tasks.wait_for_blocker(app.download_selections(sels))
544 kill_server_process()
546 # Not time for a background update yet
547 self.config.freshness = 100
548 dl = app.download_selections(sels)
549 assert dl == None
550 assert not ran_gui
552 # Trigger a background update - no updates found
553 reset_timestamps()
554 run_server('Hello.xml')
555 with trapped_exit(1):
556 dl = app.download_selections(sels)
557 assert dl == None
558 assert not ran_gui
559 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
560 self.assertEqual(1, os.stat(selections_path).st_mtime)
561 kill_server_process()
563 # Change the selections
564 sels_path = os.path.join(app.path, 'selections.xml')
565 with open(sels_path) as stream:
566 old = stream.read()
567 with open(sels_path, 'w') as stream:
568 stream.write(old.replace('Hello', 'Goodbye'))
570 # Trigger another background update - metadata changes found
571 reset_timestamps()
572 run_server('Hello.xml')
573 with trapped_exit(1):
574 dl = app.download_selections(sels)
575 assert dl == None
576 assert not ran_gui
577 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
578 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
579 kill_server_process()
581 # Trigger another background update - GUI needed now
583 # Delete cached implementation so we need to download it again
584 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
585 assert os.path.basename(stored).startswith('sha1')
586 ro_rmtree(stored)
588 # Replace with a valid local feed so we don't have to download immediately
589 with open(sels_path, 'w') as stream:
590 stream.write(local_hello)
591 sels = app.get_selections()
593 os.environ['DISPLAY'] = 'dummy'
594 reset_timestamps()
595 run_server('Hello.xml')
596 with trapped_exit(1):
597 dl = app.download_selections(sels)
598 assert dl == None
599 assert ran_gui # (so doesn't actually update)
600 kill_server_process()
602 # Now again with no DISPLAY
603 reset_timestamps()
604 del os.environ['DISPLAY']
605 run_server('Hello.xml', 'HelloWorld.tgz')
606 with trapped_exit(1):
607 dl = app.download_selections(sels)
608 assert dl == None
609 assert not ran_gui # (so doesn't actually update)
611 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
612 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
613 kill_server_process()
615 sels = app.get_selections()
616 sel, = sels.selections.values()
617 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
619 # Untrust the key
620 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
622 os.environ['DISPLAY'] = 'dummy'
623 reset_timestamps()
624 run_server('Hello.xml')
625 with trapped_exit(1):
626 #import logging; logging.getLogger().setLevel(logging.INFO)
627 dl = app.download_selections(sels)
628 assert dl == None
629 assert ran_gui
630 kill_server_process()
632 # Update not triggered because of last-check-attempt
633 ran_gui = False
634 os.utime(timestamp, (1, 1)) # 1970
635 os.utime(selections_path, (1, 1))
636 dl = app.download_selections(sels)
637 assert dl == None
638 assert not ran_gui
640 def testAbort(self):
641 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
642 dl.abort()
643 assert dl._aborted.happened
644 assert dl.tempfile is None
646 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
647 path = dl.tempfile.name
648 dl.abort()
649 assert not os.path.exists(path)
650 assert dl._aborted.happened
651 assert dl.tempfile is None
653 if __name__ == '__main__':
654 try:
655 unittest.main()
656 finally:
657 kill_server_process()