2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
import helpers
15 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
16 from zeroinstall
.injector
.requirements
import Requirements
17 from zeroinstall
.injector
.driver
import Driver
18 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
19 from zeroinstall
.support
import basedir
, tasks
, ro_rmtree
20 from zeroinstall
.injector
import fetch
30 background
._detach
= lambda: False
32 local_hello
= """<?xml version="1.0" ?>
33 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
34 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
38 def output_suppressed():
39 old_stdout
= sys
.stdout
40 old_stderr
= sys
.stderr
42 sys
.stdout
= StringIO()
43 sys
.stderr
= StringIO()
48 except BaseException
as ex
:
49 # Don't abort unit-tests if someone raises SystemExit
50 raise Exception(str(type(ex
)) + " " + str(ex
))
52 sys
.stdout
= old_stdout
53 sys
.stderr
= old_stderr
56 def trapped_exit(expected_exit_status
):
60 # The background handler runs in the same process
61 # as the tests, so don't let it abort.
62 if os
.getpid() == pid
:
63 raise SystemExit(code
)
64 # But, child download processes are OK
71 except SystemExit as ex
:
72 assert ex
.code
== expected_exit_status
77 def __init__(self
, reply
):
83 def download_and_execute(driver
, prog_args
, main
= None):
84 downloaded
= driver
.solve_and_download_impls()
86 tasks
.wait_for_blocker(downloaded
)
87 run
.execute_selections(driver
.solver
.selections
, prog_args
, stores
= driver
.config
.stores
, main
= main
)
91 return 3 # NM_STATUS_CONNECTED
94 def kill_server_process():
96 if server_process
is not None:
97 # The process may still be running. See
98 # http://bugs.python.org/issue14252 for why this is so
101 server_process
.kill()
104 server_process
.kill()
105 except WindowsError, e
:
106 # This is what happens when terminate
107 # is called after the process has died.
108 if e
.winerror
== 5 and e
.strerror
== 'Access is denied':
109 assert not server_process
.poll()
112 server_process
.wait()
113 server_process
= None
115 def run_server(*args
):
116 global server_process
117 assert server_process
is None
118 server_process
= server
.handle_requests(*args
)
120 real_get_selections_gui
= helpers
.get_selections_gui
122 class TestDownload(BaseTest
):
126 self
.config
.handler
.allow_downloads
= True
127 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
129 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
131 stream
= tempfile
.TemporaryFile()
132 stream
.write(data
.thomas_key
)
134 gpg
.import_key(stream
)
137 trust
.trust_db
.watchers
= []
139 helpers
.get_selections_gui
= raise_gui
145 helpers
.get_selections_gui
= real_get_selections_gui
146 BaseTest
.tearDown(self
)
147 kill_server_process()
149 def testRejectKey(self
):
150 with
output_suppressed():
151 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
152 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
153 assert driver
.need_download()
154 sys
.stdin
= Reply("N\n")
156 download_and_execute(driver
, ['Hello'])
158 except model
.SafeException
as ex
:
159 if "has no usable implementations" not in str(ex
):
161 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
163 self
.config
.handler
.ex
= None
165 def testRejectKeyXML(self
):
166 with
output_suppressed():
167 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
168 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
169 assert driver
.need_download()
170 sys
.stdin
= Reply("N\n")
172 download_and_execute(driver
, ['Hello'])
174 except model
.SafeException
as ex
:
175 if "has no usable implementations" not in str(ex
):
177 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
179 self
.config
.handler
.ex
= None
181 def testImport(self
):
182 from zeroinstall
.injector
import cli
184 rootLogger
= getLogger()
185 rootLogger
.disabled
= True
188 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
190 except model
.SafeException
as ex
:
191 assert 'NO-SUCH-FILE' in str(ex
)
193 rootLogger
.disabled
= False
194 rootLogger
.setLevel(WARN
)
196 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
197 self
.assertEqual(None, hello
)
199 with
output_suppressed():
200 run_server('6FCF121BE2390E0B.gpg')
201 sys
.stdin
= Reply("Y\n")
203 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
204 cli
.main(['--import', 'Hello'], config
= self
.config
)
205 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
207 # Check we imported the interface after trusting the key
208 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
209 self
.assertEqual(1, len(hello
.implementations
))
211 self
.assertEqual(None, hello
.local_path
)
213 # Shouldn't need to prompt the second time
215 cli
.main(['--import', 'Hello'], config
= self
.config
)
217 def testSelections(self
):
218 from zeroinstall
.injector
import cli
219 root
= qdom
.parse(open("selections.xml"))
220 sels
= selections
.Selections(root
)
221 class Options
: dry_run
= False
223 with
output_suppressed():
224 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
225 sys
.stdin
= Reply("Y\n")
227 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
231 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
232 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
233 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
235 assert sels
.download_missing(self
.config
) is None
237 def testHelpers(self
):
238 from zeroinstall
import helpers
240 with
output_suppressed():
241 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
242 sys
.stdin
= Reply("Y\n")
243 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
244 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
245 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
246 assert sels
.download_missing(self
.config
) is None
248 def testSelectionsWithFeed(self
):
249 from zeroinstall
.injector
import cli
250 root
= qdom
.parse(open("selections.xml"))
251 sels
= selections
.Selections(root
)
253 with
output_suppressed():
254 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
255 sys
.stdin
= Reply("Y\n")
257 tasks
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
259 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
260 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
261 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
263 assert sels
.download_missing(self
.config
) is None
265 def testAcceptKey(self
):
266 with
output_suppressed():
267 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
268 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
269 assert driver
.need_download()
270 sys
.stdin
= Reply("Y\n")
272 download_and_execute(driver
, ['Hello'], main
= 'Missing')
274 except model
.SafeException
as ex
:
275 if "HelloWorld/Missing" not in str(ex
):
278 def testAutoAcceptKey(self
):
279 self
.config
.auto_approve_keys
= True
280 with
output_suppressed():
281 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
282 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
283 assert driver
.need_download()
284 sys
.stdin
= Reply("")
286 download_and_execute(driver
, ['Hello'], main
= 'Missing')
288 except model
.SafeException
as ex
:
289 if "HelloWorld/Missing" not in str(ex
):
292 def testDistro(self
):
293 with
output_suppressed():
294 native_url
= 'http://example.com:8000/Native.xml'
296 # Initially, we don't have the feed at all...
297 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
298 assert master_feed
is None, master_feed
300 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
301 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
302 driver
= Driver(requirements
= Requirements(native_url
), config
= self
.config
)
303 assert driver
.need_download()
305 solve
= driver
.solve_with_downloads()
306 tasks
.wait_for_blocker(solve
)
309 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
310 assert master_feed
is not None
311 assert master_feed
.implementations
== {}
313 distro_feed_url
= master_feed
.get_distro_feed()
314 assert distro_feed_url
is not None
315 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
316 assert distro_feed
is not None
317 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
319 def testWrongSize(self
):
320 with
output_suppressed():
321 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
322 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
323 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello-wrong-size'), config
= self
.config
)
324 assert driver
.need_download()
325 sys
.stdin
= Reply("Y\n")
327 download_and_execute(driver
, ['Hello'], main
= 'Missing')
329 except model
.SafeException
as ex
:
330 if "Downloaded archive has incorrect size" not in str(ex
):
333 def testRecipe(self
):
336 sys
.stdout
= StringIO()
337 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
338 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
340 download_and_execute(driver
, [])
342 except model
.SafeException
as ex
:
343 if "HelloWorld/Missing" not in str(ex
):
348 def testSymlink(self
):
351 sys
.stdout
= StringIO()
352 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
353 driver
= Driver(requirements
= Requirements(os
.path
.abspath('RecipeSymlink.xml')), config
= self
.config
)
355 download_and_execute(driver
, [])
357 except model
.SafeException
as ex
:
358 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
360 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
364 def testAutopackage(self
):
367 sys
.stdout
= StringIO()
368 run_server('HelloWorld.autopackage')
369 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Autopackage.xml')), config
= self
.config
)
371 download_and_execute(driver
, [])
373 except model
.SafeException
as ex
:
374 if "HelloWorld/Missing" not in str(ex
):
379 def testRecipeFailure(self
):
382 sys
.stdout
= StringIO()
384 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
386 download_and_execute(driver
, [])
388 except download
.DownloadError
as ex
:
389 if "Connection" not in str(ex
):
394 def testMirrors(self
):
397 sys
.stdout
= StringIO()
398 getLogger().setLevel(ERROR
)
399 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
400 run_server(server
.Give404('/Hello.xml'),
401 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
402 '/0mirror/keys/6FCF121BE2390E0B.gpg')
403 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
404 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
406 refreshed
= driver
.solve_with_downloads()
407 tasks
.wait_for_blocker(refreshed
)
408 assert driver
.solver
.ready
412 def testReplay(self
):
415 sys
.stdout
= StringIO()
416 getLogger().setLevel(ERROR
)
417 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
418 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
419 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, open('Hello-new.xml').read(), mtime
+ 10000)
421 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
422 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
423 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
424 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
426 # Update from mirror (should ignore out-of-date timestamp)
427 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
428 tasks
.wait_for_blocker(refreshed
)
430 # Update from upstream (should report an error)
431 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
433 tasks
.wait_for_blocker(refreshed
)
434 raise Exception("Should have been rejected!")
435 except model
.SafeException
as ex
:
436 assert "New feed's modification time is before old version" in str(ex
)
438 # Must finish with the newest version
439 self
.assertEqual(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
443 def testBackground(self
, verbose
= False):
444 r
= Requirements('http://example.com:8000/Hello.xml')
445 d
= Driver(requirements
= r
, config
= self
.config
)
446 self
.import_feed(r
.interface_uri
, 'Hello.xml')
447 self
.config
.freshness
= 0
448 self
.config
.network_use
= model
.network_minimal
449 d
.solver
.solve(r
.interface_uri
, arch
.get_host_architecture())
450 assert d
.solver
.ready
, d
.solver
.get_failure_reason()
453 def choose_download(registed_cb
, nid
, actions
):
455 assert actions
== ['download', 'Download'], actions
456 registed_cb(nid
, 'download')
459 traceback
.print_exc()
464 os
.environ
['DISPLAY'] = 'dummy'
467 sys
.stdout
= StringIO()
468 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
469 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
470 my_dbus
.user_callback
= choose_download
472 with
trapped_exit(1):
473 from zeroinstall
.injector
import config
474 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
475 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
477 background
.spawn_background_update(d
, verbose
)
479 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
484 def testBackgroundVerbose(self
):
485 self
.testBackground(verbose
= True)
487 def testBackgroundApp(self
):
488 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
490 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
494 with
output_suppressed():
495 # Select a version of Hello
496 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
497 r
= Requirements('http://example.com:8000/Hello.xml')
498 driver
= Driver(requirements
= r
, config
= self
.config
)
499 tasks
.wait_for_blocker(driver
.solve_with_downloads())
500 assert driver
.solver
.ready
501 kill_server_process()
504 app
= self
.config
.app_mgr
.create_app('test-app', r
)
505 app
.set_selections(driver
.solver
.selections
)
506 timestamp
= os
.path
.join(app
.path
, 'last-checked')
507 last_check_attempt
= os
.path
.join(app
.path
, 'last-check-attempt')
508 selections_path
= os
.path
.join(app
.path
, 'selections.xml')
510 def reset_timestamps():
512 os
.utime(timestamp
, (1, 1)) # 1970
513 os
.utime(selections_path
, (1, 1))
514 if os
.path
.exists(last_check_attempt
):
515 os
.unlink(last_check_attempt
)
517 # Download the implementation
518 sels
= app
.get_selections()
519 run_server('HelloWorld.tgz')
520 tasks
.wait_for_blocker(app
.download_selections(sels
))
521 kill_server_process()
523 # Not time for a background update yet
524 self
.config
.freshness
= 100
525 dl
= app
.download_selections(sels
)
529 # Trigger a background update - no updates found
531 run_server('Hello.xml')
532 with
trapped_exit(1):
533 dl
= app
.download_selections(sels
)
536 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
537 self
.assertEqual(1, os
.stat(selections_path
).st_mtime
)
538 kill_server_process()
540 # Change the selections
541 sels_path
= os
.path
.join(app
.path
, 'selections.xml')
542 with
open(sels_path
) as stream
:
544 with
open(sels_path
, 'w') as stream
:
545 stream
.write(old
.replace('Hello', 'Goodbye'))
547 # Trigger another background update - metadata changes found
549 run_server('Hello.xml')
550 with
trapped_exit(1):
551 dl
= app
.download_selections(sels
)
554 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
555 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
556 kill_server_process()
558 # Trigger another background update - GUI needed now
560 # Delete cached implementation so we need to download it again
561 stored
= sels
.selections
['http://example.com:8000/Hello.xml'].get_path(self
.config
.stores
)
562 assert os
.path
.basename(stored
).startswith('sha1')
565 # Replace with a valid local feed so we don't have to download immediately
566 with
open(sels_path
, 'w') as stream
:
567 stream
.write(local_hello
)
568 sels
= app
.get_selections()
570 os
.environ
['DISPLAY'] = 'dummy'
572 run_server('Hello.xml')
573 with
trapped_exit(1):
574 dl
= app
.download_selections(sels
)
576 assert ran_gui
# (so doesn't actually update)
577 kill_server_process()
579 # Now again with no DISPLAY
581 del os
.environ
['DISPLAY']
582 run_server('Hello.xml', 'HelloWorld.tgz')
583 with
trapped_exit(1):
584 dl
= app
.download_selections(sels
)
586 assert ran_gui
# (so doesn't actually update)
588 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
589 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
590 kill_server_process()
592 sels
= app
.get_selections()
593 sel
, = sels
.selections
.values()
594 self
.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel
.id)
597 trust
.trust_db
.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
599 os
.environ
['DISPLAY'] = 'dummy'
601 run_server('Hello.xml')
602 with
trapped_exit(1):
603 #import logging; logging.getLogger().setLevel(logging.INFO)
604 dl
= app
.download_selections(sels
)
607 kill_server_process()
609 # Update not triggered because of last-check-attempt
611 os
.utime(timestamp
, (1, 1)) # 1970
612 os
.utime(selections_path
, (1, 1))
613 dl
= app
.download_selections(sels
)
617 if __name__
== '__main__':
621 kill_server_process()