Fixed package-implementation scoring
[zeroinstall/solver.git] / tests / testdownload.py
blobccad3b4a8ecf280691d6d1148cbdc6282cd1a4d4
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest, StringIO
4 import sys, tempfile, os
5 import unittest
6 import logging
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall import helpers
15 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
16 from zeroinstall.injector.requirements import Requirements
17 from zeroinstall.injector.driver import Driver
18 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
19 from zeroinstall.support import basedir, tasks, ro_rmtree
20 from zeroinstall.injector import fetch
21 import data
22 import my_dbus
24 import server
26 ran_gui = False
27 def raise_gui(*args, **kwargs):
28 global ran_gui
29 use_gui = kwargs.get('use_gui', True)
30 assert use_gui != False
31 if 'DISPLAY' in os.environ:
32 ran_gui = True
33 else:
34 assert use_gui is None
35 return helpers.DontUseGUI
37 background._detach = lambda: False
39 local_hello = """<?xml version="1.0" ?>
40 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
41 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
42 </selections>"""
44 @contextmanager
45 def output_suppressed():
46 old_stdout = sys.stdout
47 old_stderr = sys.stderr
48 try:
49 sys.stdout = StringIO()
50 sys.stderr = StringIO()
51 try:
52 yield
53 except Exception:
54 raise
55 except BaseException as ex:
56 # Don't abort unit-tests if someone raises SystemExit
57 raise Exception(str(type(ex)) + " " + str(ex))
58 finally:
59 sys.stdout = old_stdout
60 sys.stderr = old_stderr
62 @contextmanager
63 def trapped_exit(expected_exit_status):
64 pid = os.getpid()
65 old_exit = os._exit
66 def my_exit(code):
67 # The background handler runs in the same process
68 # as the tests, so don't let it abort.
69 if os.getpid() == pid:
70 raise SystemExit(code)
71 # But, child download processes are OK
72 old_exit(code)
73 os._exit = my_exit
74 try:
75 try:
76 yield
77 assert False
78 except SystemExit as ex:
79 assert ex.code == expected_exit_status
80 finally:
81 os._exit = old_exit
83 class Reply:
84 def __init__(self, reply):
85 self.reply = reply
87 def readline(self):
88 return self.reply
90 def download_and_execute(driver, prog_args, main = None):
91 driver_download(driver)
92 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
94 def driver_download(driver):
95 downloaded = driver.solve_and_download_impls()
96 if downloaded:
97 tasks.wait_for_blocker(downloaded)
99 class NetworkManager:
100 def state(self):
101 return 3 # NM_STATUS_CONNECTED
103 server_process = None
104 def kill_server_process():
105 global server_process
106 if server_process is not None:
107 # The process may still be running. See
108 # http://bugs.python.org/issue14252 for why this is so
109 # complicated.
110 server_process.stdout.close()
111 if os.name != 'nt':
112 server_process.kill()
113 else:
114 try:
115 server_process.kill()
116 except WindowsError as e:
117 # This is what happens when terminate
118 # is called after the process has died.
119 if e.winerror == 5 and e.strerror == 'Access is denied':
120 assert not server_process.poll()
121 else:
122 raise
123 server_process.wait()
124 server_process = None
126 def run_server(*args):
127 global server_process
128 assert server_process is None
129 server_process = server.handle_requests(*args)
131 real_get_selections_gui = helpers.get_selections_gui
133 class TestDownload(BaseTest):
134 def setUp(self):
135 BaseTest.setUp(self)
137 self.config.handler.allow_downloads = True
138 self.config.key_info_server = 'http://localhost:3333/key-info'
140 self.config.fetcher = fetch.Fetcher(self.config)
142 stream = tempfile.TemporaryFile()
143 stream.write(data.thomas_key)
144 stream.seek(0)
145 gpg.import_key(stream)
146 stream.close()
148 trust.trust_db.watchers = []
150 helpers.get_selections_gui = raise_gui
152 global ran_gui
153 ran_gui = False
155 def tearDown(self):
156 helpers.get_selections_gui = real_get_selections_gui
157 BaseTest.tearDown(self)
158 kill_server_process()
160 def testRejectKey(self):
161 with output_suppressed():
162 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
163 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
164 assert driver.need_download()
165 sys.stdin = Reply("N\n")
166 try:
167 download_and_execute(driver, ['Hello'])
168 assert 0
169 except model.SafeException as ex:
170 if "has no usable implementations" not in str(ex):
171 raise ex
172 if "Not signed with a trusted key" not in str(self.config.handler.ex):
173 raise self.config.handler.ex
174 self.config.handler.ex = None
176 def testRejectKeyXML(self):
177 with output_suppressed():
178 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
179 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
180 assert driver.need_download()
181 sys.stdin = Reply("N\n")
182 try:
183 download_and_execute(driver, ['Hello'])
184 assert 0
185 except model.SafeException as ex:
186 if "has no usable implementations" not in str(ex):
187 raise ex
188 if "Not signed with a trusted key" not in str(self.config.handler.ex):
189 raise
190 self.config.handler.ex = None
192 def testImport(self):
193 from zeroinstall.injector import cli
195 rootLogger = getLogger()
196 rootLogger.disabled = True
197 try:
198 try:
199 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
200 assert 0
201 except model.SafeException as ex:
202 assert 'NO-SUCH-FILE' in str(ex)
203 finally:
204 rootLogger.disabled = False
205 rootLogger.setLevel(WARN)
207 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
208 self.assertEqual(None, hello)
210 with output_suppressed():
211 run_server('6FCF121BE2390E0B.gpg')
212 sys.stdin = Reply("Y\n")
214 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
215 cli.main(['--import', 'Hello'], config = self.config)
216 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
218 # Check we imported the interface after trusting the key
219 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
220 self.assertEqual(1, len(hello.implementations))
222 self.assertEqual(None, hello.local_path)
224 # Shouldn't need to prompt the second time
225 sys.stdin = None
226 cli.main(['--import', 'Hello'], config = self.config)
228 def testSelections(self):
229 from zeroinstall.injector import cli
230 with open("selections.xml", 'rb') as stream:
231 root = qdom.parse(stream)
232 sels = selections.Selections(root)
233 class Options: dry_run = False
235 with output_suppressed():
236 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
237 sys.stdin = Reply("Y\n")
238 try:
239 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
240 assert False
241 except NotStored:
242 pass
243 cli.main(['--download-only', 'selections.xml'], config = self.config)
244 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
245 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
247 assert sels.download_missing(self.config) is None
249 def testHelpers(self):
250 from zeroinstall import helpers
252 with output_suppressed():
253 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
254 sys.stdin = Reply("Y\n")
255 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
256 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
257 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
258 assert sels.download_missing(self.config) is None
260 def testSelectionsWithFeed(self):
261 from zeroinstall.injector import cli
262 with open("selections.xml", 'rb') as stream:
263 root = qdom.parse(stream)
264 sels = selections.Selections(root)
266 with output_suppressed():
267 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
268 sys.stdin = Reply("Y\n")
270 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
272 cli.main(['--download-only', 'selections.xml'], config = self.config)
273 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
274 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
276 assert sels.download_missing(self.config) is None
278 def testAcceptKey(self):
279 with output_suppressed():
280 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
281 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
282 assert driver.need_download()
283 sys.stdin = Reply("Y\n")
284 try:
285 download_and_execute(driver, ['Hello'], main = 'Missing')
286 assert 0
287 except model.SafeException as ex:
288 if "HelloWorld/Missing" not in str(ex):
289 raise
291 def testAutoAcceptKey(self):
292 self.config.auto_approve_keys = True
293 with output_suppressed():
294 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
295 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
296 assert driver.need_download()
297 sys.stdin = Reply("")
298 try:
299 download_and_execute(driver, ['Hello'], main = 'Missing')
300 assert 0
301 except model.SafeException as ex:
302 if "HelloWorld/Missing" not in str(ex):
303 raise
305 def testDistro(self):
306 with output_suppressed():
307 native_url = 'http://example.com:8000/Native.xml'
309 # Initially, we don't have the feed at all...
310 master_feed = self.config.iface_cache.get_feed(native_url)
311 assert master_feed is None, master_feed
313 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
314 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
315 driver = Driver(requirements = Requirements(native_url), config = self.config)
316 assert driver.need_download()
318 solve = driver.solve_with_downloads()
319 tasks.wait_for_blocker(solve)
320 tasks.check(solve)
322 master_feed = self.config.iface_cache.get_feed(native_url)
323 assert master_feed is not None
324 assert master_feed.implementations == {}
326 distro_feed_url = master_feed.get_distro_feed()
327 assert distro_feed_url is not None
328 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
329 assert distro_feed is not None
330 assert len(distro_feed.implementations) == 2, distro_feed.implementations
332 def testWrongSize(self):
333 with output_suppressed():
334 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
335 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
336 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
337 assert driver.need_download()
338 sys.stdin = Reply("Y\n")
339 try:
340 download_and_execute(driver, ['Hello'], main = 'Missing')
341 assert 0
342 except model.SafeException as ex:
343 if "Downloaded archive has incorrect size" not in str(ex):
344 raise ex
346 def testRecipe(self):
347 old_out = sys.stdout
348 try:
349 sys.stdout = StringIO()
350 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
351 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
352 try:
353 download_and_execute(driver, [])
354 assert False
355 except model.SafeException as ex:
356 if "HelloWorld/Missing" not in str(ex):
357 raise ex
358 finally:
359 sys.stdout = old_out
361 def testRename(self):
362 with output_suppressed():
363 run_server(('HelloWorld.tar.bz2',))
364 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
365 requirements.command = None
366 driver = Driver(requirements = requirements, config = self.config)
367 driver_download(driver)
368 digests = driver.solver.selections[requirements.interface_uri].digests
369 path = self.config.stores.lookup_any(digests)
370 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
371 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
372 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
374 def testSymlink(self):
375 old_out = sys.stdout
376 try:
377 sys.stdout = StringIO()
378 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
379 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
380 try:
381 download_and_execute(driver, [])
382 assert False
383 except model.SafeException as ex:
384 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
385 raise
386 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
387 finally:
388 sys.stdout = old_out
390 def testAutopackage(self):
391 old_out = sys.stdout
392 try:
393 sys.stdout = StringIO()
394 run_server('HelloWorld.autopackage')
395 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
396 try:
397 download_and_execute(driver, [])
398 assert False
399 except model.SafeException as ex:
400 if "HelloWorld/Missing" not in str(ex):
401 raise
402 finally:
403 sys.stdout = old_out
405 def testRecipeFailure(self):
406 old_out = sys.stdout
407 try:
408 run_server('*')
409 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
410 try:
411 download_and_execute(driver, [])
412 assert False
413 except download.DownloadError as ex:
414 if "Connection" not in str(ex):
415 raise
416 finally:
417 sys.stdout = old_out
419 def testMirrors(self):
420 getLogger().setLevel(logging.ERROR)
421 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
422 run_server(server.Give404('/Hello.xml'),
423 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
424 '/0mirror/keys/6FCF121BE2390E0B.gpg',
425 server.Give404('/HelloWorld.tgz'),
426 '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz')
427 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
428 self.config.mirror = 'http://example.com:8000/0mirror'
430 refreshed = driver.solve_with_downloads()
431 tasks.wait_for_blocker(refreshed)
432 assert driver.solver.ready
434 #getLogger().setLevel(logging.WARN)
435 downloaded = driver.download_uncached_implementations()
436 tasks.wait_for_blocker(downloaded)
437 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
438 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
440 def testImplMirror(self):
441 # This is like testMirror, except we have a different archive (that generates the same content),
442 # rather than an exact copy of the unavailable archive.
443 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
444 run_server('/Hello.xml',
445 '/6FCF121BE2390E0B.gpg',
446 server.Give404('/HelloWorld.tgz'),
447 server.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'),
448 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
449 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
450 self.config.mirror = 'http://example.com:8000/0mirror'
452 refreshed = driver.solve_with_downloads()
453 tasks.wait_for_blocker(refreshed)
454 assert driver.solver.ready
456 getLogger().setLevel(logging.ERROR)
457 downloaded = driver.download_uncached_implementations()
458 tasks.wait_for_blocker(downloaded)
459 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
460 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
462 def testReplay(self):
463 old_out = sys.stdout
464 try:
465 sys.stdout = StringIO()
466 getLogger().setLevel(ERROR)
467 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
468 mtime = int(os.stat('Hello-new.xml').st_mtime)
469 with open('Hello-new.xml', 'rb') as stream:
470 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
472 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
473 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
474 self.config.mirror = 'http://example.com:8000/0mirror'
476 # Update from mirror (should ignore out-of-date timestamp)
477 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
478 tasks.wait_for_blocker(refreshed)
480 # Update from upstream (should report an error)
481 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
482 try:
483 tasks.wait_for_blocker(refreshed)
484 raise Exception("Should have been rejected!")
485 except model.SafeException as ex:
486 assert "New feed's modification time is before old version" in str(ex)
488 # Must finish with the newest version
489 self.assertEqual(1342285569, self.config.iface_cache._get_signature_date(iface.uri))
490 finally:
491 sys.stdout = old_out
493 def testBackground(self, verbose = False):
494 r = Requirements('http://example.com:8000/Hello.xml')
495 d = Driver(requirements = r, config = self.config)
496 self.import_feed(r.interface_uri, 'Hello.xml')
497 self.config.freshness = 0
498 self.config.network_use = model.network_minimal
499 d.solver.solve(r.interface_uri, arch.get_host_architecture())
500 assert d.solver.ready, d.solver.get_failure_reason()
502 @tasks.async
503 def choose_download(registed_cb, nid, actions):
504 try:
505 assert actions == ['download', 'Download'], actions
506 registed_cb(nid, 'download')
507 except:
508 import traceback
509 traceback.print_exc()
510 yield None
512 global ran_gui
513 ran_gui = False
514 os.environ['DISPLAY'] = 'dummy'
515 old_out = sys.stdout
516 try:
517 sys.stdout = StringIO()
518 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
519 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
520 my_dbus.user_callback = choose_download
522 with trapped_exit(1):
523 from zeroinstall.injector import config
524 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
525 config.DEFAULT_KEY_LOOKUP_SERVER = None
526 try:
527 background.spawn_background_update(d, verbose)
528 finally:
529 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
530 finally:
531 sys.stdout = old_out
532 assert ran_gui
534 def testBackgroundVerbose(self):
535 self.testBackground(verbose = True)
537 def testBackgroundApp(self):
538 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
540 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
542 global ran_gui
544 with output_suppressed():
545 # Select a version of Hello
546 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
547 r = Requirements('http://example.com:8000/Hello.xml')
548 driver = Driver(requirements = r, config = self.config)
549 tasks.wait_for_blocker(driver.solve_with_downloads())
550 assert driver.solver.ready
551 kill_server_process()
553 # Save it as an app
554 app = self.config.app_mgr.create_app('test-app', r)
555 app.set_selections(driver.solver.selections)
556 timestamp = os.path.join(app.path, 'last-checked')
557 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
558 selections_path = os.path.join(app.path, 'selections.xml')
560 def reset_timestamps():
561 global ran_gui
562 ran_gui = False
563 os.utime(timestamp, (1, 1)) # 1970
564 os.utime(selections_path, (1, 1))
565 if os.path.exists(last_check_attempt):
566 os.unlink(last_check_attempt)
568 # Download the implementation
569 sels = app.get_selections()
570 run_server('HelloWorld.tgz')
571 tasks.wait_for_blocker(app.download_selections(sels))
572 kill_server_process()
574 # Not time for a background update yet
575 self.config.freshness = 100
576 dl = app.download_selections(sels)
577 assert dl == None
578 assert not ran_gui
580 # Trigger a background update - no updates found
581 reset_timestamps()
582 run_server('Hello.xml')
583 with trapped_exit(1):
584 dl = app.download_selections(sels)
585 assert dl == None
586 assert not ran_gui
587 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
588 self.assertEqual(1, os.stat(selections_path).st_mtime)
589 kill_server_process()
591 # Change the selections
592 sels_path = os.path.join(app.path, 'selections.xml')
593 with open(sels_path) as stream:
594 old = stream.read()
595 with open(sels_path, 'w') as stream:
596 stream.write(old.replace('Hello', 'Goodbye'))
598 # Trigger another background update - metadata changes found
599 reset_timestamps()
600 run_server('Hello.xml')
601 with trapped_exit(1):
602 dl = app.download_selections(sels)
603 assert dl == None
604 assert not ran_gui
605 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
606 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
607 kill_server_process()
609 # Trigger another background update - GUI needed now
611 # Delete cached implementation so we need to download it again
612 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
613 assert os.path.basename(stored).startswith('sha1')
614 ro_rmtree(stored)
616 # Replace with a valid local feed so we don't have to download immediately
617 with open(sels_path, 'w') as stream:
618 stream.write(local_hello)
619 sels = app.get_selections()
621 os.environ['DISPLAY'] = 'dummy'
622 reset_timestamps()
623 run_server('Hello.xml')
624 with trapped_exit(1):
625 dl = app.download_selections(sels)
626 assert dl == None
627 assert ran_gui # (so doesn't actually update)
628 kill_server_process()
630 # Now again with no DISPLAY
631 reset_timestamps()
632 del os.environ['DISPLAY']
633 run_server('Hello.xml', 'HelloWorld.tgz')
634 with trapped_exit(1):
635 dl = app.download_selections(sels)
636 assert dl == None
637 assert not ran_gui # (so doesn't actually update)
639 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
640 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
641 kill_server_process()
643 sels = app.get_selections()
644 sel, = sels.selections.values()
645 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
647 # Untrust the key
648 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
650 os.environ['DISPLAY'] = 'dummy'
651 reset_timestamps()
652 run_server('Hello.xml')
653 with trapped_exit(1):
654 #import logging; logging.getLogger().setLevel(logging.INFO)
655 dl = app.download_selections(sels)
656 assert dl == None
657 assert ran_gui
658 kill_server_process()
660 # Update not triggered because of last-check-attempt
661 ran_gui = False
662 os.utime(timestamp, (1, 1)) # 1970
663 os.utime(selections_path, (1, 1))
664 dl = app.download_selections(sels)
665 assert dl == None
666 assert not ran_gui
668 def testAbort(self):
669 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
670 dl.abort()
671 assert dl._aborted.happened
672 assert dl.tempfile is None
674 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
675 path = dl.tempfile.name
676 dl.abort()
677 assert not os.path.exists(path)
678 assert dl._aborted.happened
679 assert dl.tempfile is None
681 if __name__ == '__main__':
682 try:
683 unittest.main()
684 finally:
685 kill_server_process()