Added test for downloading icons
[zeroinstall/solver.git] / tests / testdownload.py
blobbc9e6d03613bb0999f2dbed0139f44853d8df9fd
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest, StringIO
4 import sys, tempfile, os
5 import unittest
6 import logging, warnings
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall import helpers
15 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
16 from zeroinstall.injector.requirements import Requirements
17 from zeroinstall.injector.driver import Driver
18 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
19 from zeroinstall.support import basedir, tasks, ro_rmtree
20 from zeroinstall.injector import fetch
21 import data
22 import my_dbus
24 import server
26 ran_gui = False
27 def raise_gui(*args, **kwargs):
28 global ran_gui
29 use_gui = kwargs.get('use_gui', True)
30 assert use_gui != False
31 if 'DISPLAY' in os.environ:
32 ran_gui = True
33 else:
34 assert use_gui is None
35 return helpers.DontUseGUI
37 background._detach = lambda: False
39 local_hello = """<?xml version="1.0" ?>
40 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
41 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
42 </selections>"""
44 @contextmanager
45 def output_suppressed():
46 old_stdout = sys.stdout
47 old_stderr = sys.stderr
48 try:
49 sys.stdout = StringIO()
50 sys.stderr = StringIO()
51 try:
52 yield
53 except Exception:
54 raise
55 except BaseException as ex:
56 # Don't abort unit-tests if someone raises SystemExit
57 raise Exception(str(type(ex)) + " " + str(ex))
58 finally:
59 sys.stdout = old_stdout
60 sys.stderr = old_stderr
62 @contextmanager
63 def trapped_exit(expected_exit_status):
64 pid = os.getpid()
65 old_exit = os._exit
66 def my_exit(code):
67 # The background handler runs in the same process
68 # as the tests, so don't let it abort.
69 if os.getpid() == pid:
70 raise SystemExit(code)
71 # But, child download processes are OK
72 old_exit(code)
73 os._exit = my_exit
74 try:
75 try:
76 yield
77 assert False
78 except SystemExit as ex:
79 assert ex.code == expected_exit_status
80 finally:
81 os._exit = old_exit
83 @contextmanager
84 def resourcewarnings_suppressed():
85 import gc
86 if sys.version_info[0] < 3:
87 yield
88 else:
89 with warnings.catch_warnings():
90 warnings.filterwarnings("ignore", category = ResourceWarning)
91 yield
92 gc.collect()
94 class Reply:
95 def __init__(self, reply):
96 self.reply = reply
98 def readline(self):
99 return self.reply
101 def download_and_execute(driver, prog_args, main = None):
102 driver_download(driver)
103 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
105 def driver_download(driver):
106 downloaded = driver.solve_and_download_impls()
107 if downloaded:
108 tasks.wait_for_blocker(downloaded)
110 class NetworkManager:
111 def state(self):
112 return 3 # NM_STATUS_CONNECTED
114 server_process = None
115 def kill_server_process():
116 global server_process
117 if server_process is not None:
118 # The process may still be running. See
119 # http://bugs.python.org/issue14252 for why this is so
120 # complicated.
121 server_process.stdout.close()
122 if os.name != 'nt':
123 server_process.kill()
124 else:
125 try:
126 server_process.kill()
127 except WindowsError as e:
128 # This is what happens when terminate
129 # is called after the process has died.
130 if e.winerror == 5 and e.strerror == 'Access is denied':
131 assert not server_process.poll()
132 else:
133 raise
134 server_process.wait()
135 server_process = None
137 def run_server(*args):
138 global server_process
139 assert server_process is None
140 server_process = server.handle_requests(*args)
142 real_get_selections_gui = helpers.get_selections_gui
144 class TestDownload(BaseTest):
145 def setUp(self):
146 BaseTest.setUp(self)
148 self.config.handler.allow_downloads = True
149 self.config.key_info_server = 'http://localhost:3333/key-info'
151 self.config.fetcher = fetch.Fetcher(self.config)
153 stream = tempfile.TemporaryFile()
154 stream.write(data.thomas_key)
155 stream.seek(0)
156 gpg.import_key(stream)
157 stream.close()
159 trust.trust_db.watchers = []
161 helpers.get_selections_gui = raise_gui
163 global ran_gui
164 ran_gui = False
166 def tearDown(self):
167 helpers.get_selections_gui = real_get_selections_gui
168 BaseTest.tearDown(self)
169 kill_server_process()
171 # Flush out ResourceWarnings
172 import gc; gc.collect()
174 def testRejectKey(self):
175 with output_suppressed():
176 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
177 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
178 assert driver.need_download()
179 sys.stdin = Reply("N\n")
180 try:
181 download_and_execute(driver, ['Hello'])
182 assert 0
183 except model.SafeException as ex:
184 if "has no usable implementations" not in str(ex):
185 raise ex
186 if "Not signed with a trusted key" not in str(self.config.handler.ex):
187 raise self.config.handler.ex
188 self.config.handler.ex = None
190 def testRejectKeyXML(self):
191 with output_suppressed():
192 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
193 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
194 assert driver.need_download()
195 sys.stdin = Reply("N\n")
196 try:
197 download_and_execute(driver, ['Hello'])
198 assert 0
199 except model.SafeException as ex:
200 if "has no usable implementations" not in str(ex):
201 raise ex
202 if "Not signed with a trusted key" not in str(self.config.handler.ex):
203 raise
204 self.config.handler.ex = None
206 def testImport(self):
207 from zeroinstall.injector import cli
209 rootLogger = getLogger()
210 rootLogger.disabled = True
211 try:
212 try:
213 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
214 assert 0
215 except model.SafeException as ex:
216 assert 'NO-SUCH-FILE' in str(ex)
217 finally:
218 rootLogger.disabled = False
219 rootLogger.setLevel(WARN)
221 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
222 self.assertEqual(None, hello)
224 with output_suppressed():
225 run_server('6FCF121BE2390E0B.gpg')
226 sys.stdin = Reply("Y\n")
228 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
229 cli.main(['--import', 'Hello'], config = self.config)
230 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
232 # Check we imported the interface after trusting the key
233 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
234 self.assertEqual(1, len(hello.implementations))
236 self.assertEqual(None, hello.local_path)
238 # Shouldn't need to prompt the second time
239 sys.stdin = None
240 cli.main(['--import', 'Hello'], config = self.config)
242 def testSelections(self):
243 from zeroinstall.injector import cli
244 with open("selections.xml", 'rb') as stream:
245 root = qdom.parse(stream)
246 sels = selections.Selections(root)
247 class Options: dry_run = False
249 with output_suppressed():
250 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
251 sys.stdin = Reply("Y\n")
252 try:
253 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
254 assert False
255 except NotStored:
256 pass
257 cli.main(['--download-only', 'selections.xml'], config = self.config)
258 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
259 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
261 assert sels.download_missing(self.config) is None
263 def testHelpers(self):
264 from zeroinstall import helpers
266 with output_suppressed():
267 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
268 sys.stdin = Reply("Y\n")
269 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
270 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
271 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
272 assert sels.download_missing(self.config) is None
274 def testSelectionsWithFeed(self):
275 from zeroinstall.injector import cli
276 with open("selections.xml", 'rb') as stream:
277 root = qdom.parse(stream)
278 sels = selections.Selections(root)
280 with output_suppressed():
281 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
282 sys.stdin = Reply("Y\n")
284 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
286 cli.main(['--download-only', 'selections.xml'], config = self.config)
287 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
288 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
290 assert sels.download_missing(self.config) is None
292 def testAcceptKey(self):
293 with output_suppressed():
294 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
295 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
296 assert driver.need_download()
297 sys.stdin = Reply("Y\n")
298 try:
299 download_and_execute(driver, ['Hello'], main = 'Missing')
300 assert 0
301 except model.SafeException as ex:
302 if "HelloWorld/Missing" not in str(ex):
303 raise
305 def testAutoAcceptKey(self):
306 self.config.auto_approve_keys = True
307 with output_suppressed():
308 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
309 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
310 assert driver.need_download()
311 sys.stdin = Reply("")
312 try:
313 download_and_execute(driver, ['Hello'], main = 'Missing')
314 assert 0
315 except model.SafeException as ex:
316 if "HelloWorld/Missing" not in str(ex):
317 raise
319 def testDistro(self):
320 with output_suppressed():
321 native_url = 'http://example.com:8000/Native.xml'
323 # Initially, we don't have the feed at all...
324 master_feed = self.config.iface_cache.get_feed(native_url)
325 assert master_feed is None, master_feed
327 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
328 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
329 driver = Driver(requirements = Requirements(native_url), config = self.config)
330 assert driver.need_download()
332 solve = driver.solve_with_downloads()
333 tasks.wait_for_blocker(solve)
334 tasks.check(solve)
336 master_feed = self.config.iface_cache.get_feed(native_url)
337 assert master_feed is not None
338 assert master_feed.implementations == {}
340 distro_feed_url = master_feed.get_distro_feed()
341 assert distro_feed_url is not None
342 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
343 assert distro_feed is not None
344 assert len(distro_feed.implementations) == 2, distro_feed.implementations
346 def testWrongSize(self):
347 with output_suppressed():
348 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
349 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
350 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
351 assert driver.need_download()
352 sys.stdin = Reply("Y\n")
353 try:
354 download_and_execute(driver, ['Hello'], main = 'Missing')
355 assert 0
356 except model.SafeException as ex:
357 if "Downloaded archive has incorrect size" not in str(ex):
358 raise ex
360 def testRecipe(self):
361 old_out = sys.stdout
362 try:
363 sys.stdout = StringIO()
364 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
365 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
366 try:
367 download_and_execute(driver, [])
368 assert False
369 except model.SafeException as ex:
370 if "HelloWorld/Missing" not in str(ex):
371 raise ex
372 finally:
373 sys.stdout = old_out
375 def testRename(self):
376 with output_suppressed():
377 run_server(('HelloWorld.tar.bz2',))
378 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
379 requirements.command = None
380 driver = Driver(requirements = requirements, config = self.config)
381 driver_download(driver)
382 digests = driver.solver.selections[requirements.interface_uri].digests
383 path = self.config.stores.lookup_any(digests)
384 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
385 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
386 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
388 def testSymlink(self):
389 old_out = sys.stdout
390 try:
391 sys.stdout = StringIO()
392 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
393 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
394 try:
395 download_and_execute(driver, [])
396 assert False
397 except model.SafeException as ex:
398 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
399 raise
400 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
401 finally:
402 sys.stdout = old_out
404 def testAutopackage(self):
405 old_out = sys.stdout
406 try:
407 sys.stdout = StringIO()
408 run_server('HelloWorld.autopackage')
409 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
410 try:
411 download_and_execute(driver, [])
412 assert False
413 except model.SafeException as ex:
414 if "HelloWorld/Missing" not in str(ex):
415 raise
416 finally:
417 sys.stdout = old_out
419 def testRecipeFailure(self):
420 with resourcewarnings_suppressed():
421 old_out = sys.stdout
422 try:
423 run_server('*')
424 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
425 try:
426 download_and_execute(driver, [])
427 assert False
428 except download.DownloadError as ex:
429 if "Connection" not in str(ex):
430 raise
431 finally:
432 sys.stdout = old_out
434 def testMirrors(self):
435 with resourcewarnings_suppressed():
436 getLogger().setLevel(logging.ERROR)
437 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
438 run_server(server.Give404('/Hello.xml'),
439 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
440 '/0mirror/keys/6FCF121BE2390E0B.gpg',
441 server.Give404('/HelloWorld.tgz'),
442 '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz')
443 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
444 self.config.mirror = 'http://example.com:8000/0mirror'
446 refreshed = driver.solve_with_downloads()
447 tasks.wait_for_blocker(refreshed)
448 assert driver.solver.ready
450 #getLogger().setLevel(logging.WARN)
451 downloaded = driver.download_uncached_implementations()
452 tasks.wait_for_blocker(downloaded)
453 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
454 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
456 def testImplMirror(self):
457 with resourcewarnings_suppressed():
458 # This is like testMirror, except we have a different archive (that generates the same content),
459 # rather than an exact copy of the unavailable archive.
460 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
461 run_server('/Hello.xml',
462 '/6FCF121BE2390E0B.gpg',
463 server.Give404('/HelloWorld.tgz'),
464 server.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'),
465 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
466 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
467 self.config.mirror = 'http://example.com:8000/0mirror'
469 refreshed = driver.solve_with_downloads()
470 tasks.wait_for_blocker(refreshed)
471 assert driver.solver.ready
473 getLogger().setLevel(logging.ERROR)
474 downloaded = driver.download_uncached_implementations()
475 tasks.wait_for_blocker(downloaded)
476 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
477 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
479 def testReplay(self):
480 with resourcewarnings_suppressed():
481 old_out = sys.stdout
482 try:
483 sys.stdout = StringIO()
484 getLogger().setLevel(ERROR)
485 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
486 mtime = int(os.stat('Hello-new.xml').st_mtime)
487 with open('Hello-new.xml', 'rb') as stream:
488 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
490 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
491 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
492 self.config.mirror = 'http://example.com:8000/0mirror'
494 # Update from mirror (should ignore out-of-date timestamp)
495 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
496 tasks.wait_for_blocker(refreshed)
498 # Update from upstream (should report an error)
499 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
500 try:
501 tasks.wait_for_blocker(refreshed)
502 raise Exception("Should have been rejected!")
503 except model.SafeException as ex:
504 assert "New feed's modification time is before old version" in str(ex)
506 # Must finish with the newest version
507 self.assertEqual(1342285569, self.config.iface_cache._get_signature_date(iface.uri))
508 finally:
509 sys.stdout = old_out
511 def testBackground(self, verbose = False):
512 r = Requirements('http://example.com:8000/Hello.xml')
513 d = Driver(requirements = r, config = self.config)
514 self.import_feed(r.interface_uri, 'Hello.xml')
515 self.config.freshness = 0
516 self.config.network_use = model.network_minimal
517 d.solver.solve(r.interface_uri, arch.get_host_architecture())
518 assert d.solver.ready, d.solver.get_failure_reason()
520 @tasks.async
521 def choose_download(registed_cb, nid, actions):
522 try:
523 assert actions == ['download', 'Download'], actions
524 registed_cb(nid, 'download')
525 except:
526 import traceback
527 traceback.print_exc()
528 yield None
530 global ran_gui
531 ran_gui = False
532 os.environ['DISPLAY'] = 'dummy'
533 old_out = sys.stdout
534 try:
535 sys.stdout = StringIO()
536 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
537 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
538 my_dbus.user_callback = choose_download
540 with trapped_exit(1):
541 from zeroinstall.injector import config
542 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
543 config.DEFAULT_KEY_LOOKUP_SERVER = None
544 try:
545 background.spawn_background_update(d, verbose)
546 finally:
547 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
548 finally:
549 sys.stdout = old_out
550 assert ran_gui
552 def testBackgroundVerbose(self):
553 self.testBackground(verbose = True)
555 def testBackgroundApp(self):
556 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
558 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
560 global ran_gui
562 with output_suppressed():
563 # Select a version of Hello
564 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
565 r = Requirements('http://example.com:8000/Hello.xml')
566 driver = Driver(requirements = r, config = self.config)
567 tasks.wait_for_blocker(driver.solve_with_downloads())
568 assert driver.solver.ready
569 kill_server_process()
571 # Save it as an app
572 app = self.config.app_mgr.create_app('test-app', r)
573 app.set_selections(driver.solver.selections)
574 timestamp = os.path.join(app.path, 'last-checked')
575 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
576 selections_path = os.path.join(app.path, 'selections.xml')
578 def reset_timestamps():
579 global ran_gui
580 ran_gui = False
581 os.utime(timestamp, (1, 1)) # 1970
582 os.utime(selections_path, (1, 1))
583 if os.path.exists(last_check_attempt):
584 os.unlink(last_check_attempt)
586 # Download the implementation
587 sels = app.get_selections()
588 run_server('HelloWorld.tgz')
589 tasks.wait_for_blocker(app.download_selections(sels))
590 kill_server_process()
592 # Not time for a background update yet
593 self.config.freshness = 100
594 dl = app.download_selections(sels)
595 assert dl == None
596 assert not ran_gui
598 # Trigger a background update - no updates found
599 reset_timestamps()
600 run_server('Hello.xml')
601 with trapped_exit(1):
602 dl = app.download_selections(sels)
603 assert dl == None
604 assert not ran_gui
605 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
606 self.assertEqual(1, os.stat(selections_path).st_mtime)
607 kill_server_process()
609 # Change the selections
610 sels_path = os.path.join(app.path, 'selections.xml')
611 with open(sels_path) as stream:
612 old = stream.read()
613 with open(sels_path, 'w') as stream:
614 stream.write(old.replace('Hello', 'Goodbye'))
616 # Trigger another background update - metadata changes found
617 reset_timestamps()
618 run_server('Hello.xml')
619 with trapped_exit(1):
620 dl = app.download_selections(sels)
621 assert dl == None
622 assert not ran_gui
623 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
624 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
625 kill_server_process()
627 # Trigger another background update - GUI needed now
629 # Delete cached implementation so we need to download it again
630 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
631 assert os.path.basename(stored).startswith('sha1')
632 ro_rmtree(stored)
634 # Replace with a valid local feed so we don't have to download immediately
635 with open(sels_path, 'w') as stream:
636 stream.write(local_hello)
637 sels = app.get_selections()
639 os.environ['DISPLAY'] = 'dummy'
640 reset_timestamps()
641 run_server('Hello.xml')
642 with trapped_exit(1):
643 dl = app.download_selections(sels)
644 assert dl == None
645 assert ran_gui # (so doesn't actually update)
646 kill_server_process()
648 # Now again with no DISPLAY
649 reset_timestamps()
650 del os.environ['DISPLAY']
651 run_server('Hello.xml', 'HelloWorld.tgz')
652 with trapped_exit(1):
653 dl = app.download_selections(sels)
654 assert dl == None
655 assert not ran_gui # (so doesn't actually update)
657 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
658 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
659 kill_server_process()
661 sels = app.get_selections()
662 sel, = sels.selections.values()
663 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
665 # Untrust the key
666 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
668 os.environ['DISPLAY'] = 'dummy'
669 reset_timestamps()
670 run_server('Hello.xml')
671 with trapped_exit(1):
672 #import logging; logging.getLogger().setLevel(logging.INFO)
673 dl = app.download_selections(sels)
674 assert dl == None
675 assert ran_gui
676 kill_server_process()
678 # Update not triggered because of last-check-attempt
679 ran_gui = False
680 os.utime(timestamp, (1, 1)) # 1970
681 os.utime(selections_path, (1, 1))
682 dl = app.download_selections(sels)
683 assert dl == None
684 assert not ran_gui
686 def testAbort(self):
687 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
688 dl.abort()
689 assert dl._aborted.happened
690 assert dl.tempfile is None
692 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
693 path = dl.tempfile.name
694 dl.abort()
695 assert not os.path.exists(path)
696 assert dl._aborted.happened
697 assert dl.tempfile is None
699 def testDownloadIconFails(self):
700 mydir = os.path.dirname(os.path.abspath(__file__))
701 path = model.canonical_iface_uri(os.path.join(mydir, 'Binary.xml'))
702 iface = self.config.iface_cache.get_interface(path)
703 blocker = self.config.fetcher.download_icon(iface)
704 try:
705 tasks.wait_for_blocker(blocker)
706 assert False
707 except download.DownloadError as ex:
708 assert "Error downloading http://localhost/missing.png" in str(ex), ex
710 if __name__ == '__main__':
711 try:
712 unittest.main()
713 finally:
714 kill_server_process()