2 from __future__
import with_statement
3 from basetest
import BaseTest
5 import sys
, tempfile
, os
6 if sys
.version_info
[0] > 2:
7 from io
import StringIO
9 from StringIO
import StringIO
10 import unittest
, signal
11 from logging
import getLogger
, WARN
, ERROR
12 from contextlib
import contextmanager
14 sys
.path
.insert(0, '..')
16 os
.environ
["http_proxy"] = "localhost:8000"
18 from zeroinstall
import helpers
19 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
20 from zeroinstall
.injector
.requirements
import Requirements
21 from zeroinstall
.injector
.driver
import Driver
22 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
23 from zeroinstall
.support
import basedir
, tasks
, ro_rmtree
24 from zeroinstall
.injector
import fetch
34 background
._detach
= lambda: False
36 local_hello
= """<?xml version="1.0" ?>
37 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
38 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
42 def output_suppressed():
43 old_stdout
= sys
.stdout
44 old_stderr
= sys
.stderr
46 sys
.stdout
= StringIO()
47 sys
.stderr
= StringIO()
52 except BaseException
as ex
:
53 # Don't abort unit-tests if someone raises SystemExit
54 raise Exception(str(type(ex
)) + " " + str(ex
))
56 sys
.stdout
= old_stdout
57 sys
.stderr
= old_stderr
60 def trapped_exit(expected_exit_status
):
64 # The background handler runs in the same process
65 # as the tests, so don't let it abort.
66 if os
.getpid() == pid
:
67 raise SystemExit(code
)
68 # But, child download processes are OK
75 except SystemExit as ex
:
76 assert ex
.code
== expected_exit_status
81 def __init__(self
, reply
):
87 def download_and_execute(driver
, prog_args
, main
= None):
88 driver_download(driver
)
89 run
.execute_selections(driver
.solver
.selections
, prog_args
, stores
= driver
.config
.stores
, main
= main
)
91 def driver_download(driver
):
92 downloaded
= driver
.solve_and_download_impls()
94 tasks
.wait_for_blocker(downloaded
)
98 return 3 # NM_STATUS_CONNECTED
100 server_process
= None
101 def kill_server_process():
102 global server_process
103 if server_process
is not None:
104 # The process may still be running. See
105 # http://bugs.python.org/issue14252 for why this is so
107 server_process
.stdout
.close()
109 server_process
.kill()
112 server_process
.kill()
113 except WindowsError as e
:
114 # This is what happens when terminate
115 # is called after the process has died.
116 if e
.winerror
== 5 and e
.strerror
== 'Access is denied':
117 assert not server_process
.poll()
120 server_process
.wait()
121 server_process
= None
123 def run_server(*args
):
124 global server_process
125 assert server_process
is None
126 server_process
= server
.handle_requests(*args
)
128 real_get_selections_gui
= helpers
.get_selections_gui
130 class TestDownload(BaseTest
):
134 self
.config
.handler
.allow_downloads
= True
135 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
137 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
139 stream
= tempfile
.TemporaryFile()
140 stream
.write(data
.thomas_key
)
142 gpg
.import_key(stream
)
145 trust
.trust_db
.watchers
= []
147 helpers
.get_selections_gui
= raise_gui
153 helpers
.get_selections_gui
= real_get_selections_gui
154 BaseTest
.tearDown(self
)
155 kill_server_process()
157 def testRejectKey(self
):
158 with
output_suppressed():
159 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
160 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
161 assert driver
.need_download()
162 sys
.stdin
= Reply("N\n")
164 download_and_execute(driver
, ['Hello'])
166 except model
.SafeException
as ex
:
167 if "has no usable implementations" not in str(ex
):
169 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
170 raise self
.config
.handler
.ex
171 self
.config
.handler
.ex
= None
173 def testRejectKeyXML(self
):
174 with
output_suppressed():
175 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
176 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
177 assert driver
.need_download()
178 sys
.stdin
= Reply("N\n")
180 download_and_execute(driver
, ['Hello'])
182 except model
.SafeException
as ex
:
183 if "has no usable implementations" not in str(ex
):
185 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
187 self
.config
.handler
.ex
= None
189 def testImport(self
):
190 from zeroinstall
.injector
import cli
192 rootLogger
= getLogger()
193 rootLogger
.disabled
= True
196 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
198 except model
.SafeException
as ex
:
199 assert 'NO-SUCH-FILE' in str(ex
)
201 rootLogger
.disabled
= False
202 rootLogger
.setLevel(WARN
)
204 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
205 self
.assertEqual(None, hello
)
207 with
output_suppressed():
208 run_server('6FCF121BE2390E0B.gpg')
209 sys
.stdin
= Reply("Y\n")
211 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
212 cli
.main(['--import', 'Hello'], config
= self
.config
)
213 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
215 # Check we imported the interface after trusting the key
216 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
217 self
.assertEqual(1, len(hello
.implementations
))
219 self
.assertEqual(None, hello
.local_path
)
221 # Shouldn't need to prompt the second time
223 cli
.main(['--import', 'Hello'], config
= self
.config
)
225 def testSelections(self
):
226 from zeroinstall
.injector
import cli
227 with
open("selections.xml", 'rb') as stream
:
228 root
= qdom
.parse(stream
)
229 sels
= selections
.Selections(root
)
230 class Options
: dry_run
= False
232 with
output_suppressed():
233 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
234 sys
.stdin
= Reply("Y\n")
236 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
240 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
241 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
242 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
244 assert sels
.download_missing(self
.config
) is None
246 def testHelpers(self
):
247 from zeroinstall
import helpers
249 with
output_suppressed():
250 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
251 sys
.stdin
= Reply("Y\n")
252 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
253 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
254 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
255 assert sels
.download_missing(self
.config
) is None
257 def testSelectionsWithFeed(self
):
258 from zeroinstall
.injector
import cli
259 with
open("selections.xml", 'rb') as stream
:
260 root
= qdom
.parse(stream
)
261 sels
= selections
.Selections(root
)
263 with
output_suppressed():
264 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
265 sys
.stdin
= Reply("Y\n")
267 tasks
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
269 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
270 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
271 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
273 assert sels
.download_missing(self
.config
) is None
275 def testAcceptKey(self
):
276 with
output_suppressed():
277 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
278 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
279 assert driver
.need_download()
280 sys
.stdin
= Reply("Y\n")
282 download_and_execute(driver
, ['Hello'], main
= 'Missing')
284 except model
.SafeException
as ex
:
285 if "HelloWorld/Missing" not in str(ex
):
288 def testAutoAcceptKey(self
):
289 self
.config
.auto_approve_keys
= True
290 with
output_suppressed():
291 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
292 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
293 assert driver
.need_download()
294 sys
.stdin
= Reply("")
296 download_and_execute(driver
, ['Hello'], main
= 'Missing')
298 except model
.SafeException
as ex
:
299 if "HelloWorld/Missing" not in str(ex
):
302 def testDistro(self
):
303 with
output_suppressed():
304 native_url
= 'http://example.com:8000/Native.xml'
306 # Initially, we don't have the feed at all...
307 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
308 assert master_feed
is None, master_feed
310 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
311 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
312 driver
= Driver(requirements
= Requirements(native_url
), config
= self
.config
)
313 assert driver
.need_download()
315 solve
= driver
.solve_with_downloads()
316 tasks
.wait_for_blocker(solve
)
319 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
320 assert master_feed
is not None
321 assert master_feed
.implementations
== {}
323 distro_feed_url
= master_feed
.get_distro_feed()
324 assert distro_feed_url
is not None
325 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
326 assert distro_feed
is not None
327 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
329 def testWrongSize(self
):
330 with
output_suppressed():
331 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
332 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
333 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello-wrong-size'), config
= self
.config
)
334 assert driver
.need_download()
335 sys
.stdin
= Reply("Y\n")
337 download_and_execute(driver
, ['Hello'], main
= 'Missing')
339 except model
.SafeException
as ex
:
340 if "Downloaded archive has incorrect size" not in str(ex
):
343 def testRecipe(self
):
346 sys
.stdout
= StringIO()
347 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
348 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
350 download_and_execute(driver
, [])
352 except model
.SafeException
as ex
:
353 if "HelloWorld/Missing" not in str(ex
):
358 def testRename(self
):
359 with
output_suppressed():
360 run_server(('HelloWorld.tar.bz2',))
361 requirements
= Requirements(os
.path
.abspath('RecipeRename.xml'))
362 requirements
.command
= None
363 driver
= Driver(requirements
= requirements
, config
= self
.config
)
364 driver_download(driver
)
365 digests
= driver
.solver
.selections
[requirements
.interface_uri
].digests
366 path
= self
.config
.stores
.lookup_any(digests
)
367 assert os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'minor'))
368 assert not os
.path
.exists(os
.path
.join(path
, 'HelloWorld'))
369 assert not os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'main'))
371 def testSymlink(self
):
374 sys
.stdout
= StringIO()
375 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
376 driver
= Driver(requirements
= Requirements(os
.path
.abspath('RecipeSymlink.xml')), config
= self
.config
)
378 download_and_execute(driver
, [])
380 except model
.SafeException
as ex
:
381 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
383 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
387 def testAutopackage(self
):
390 sys
.stdout
= StringIO()
391 run_server('HelloWorld.autopackage')
392 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Autopackage.xml')), config
= self
.config
)
394 download_and_execute(driver
, [])
396 except model
.SafeException
as ex
:
397 if "HelloWorld/Missing" not in str(ex
):
402 def testRecipeFailure(self
):
405 sys
.stdout
= StringIO()
407 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
409 download_and_execute(driver
, [])
411 except download
.DownloadError
as ex
:
412 if "Connection" not in str(ex
):
417 def testMirrors(self
):
420 sys
.stdout
= StringIO()
421 getLogger().setLevel(ERROR
)
422 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
423 run_server(server
.Give404('/Hello.xml'),
424 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
425 '/0mirror/keys/6FCF121BE2390E0B.gpg')
426 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
427 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
429 refreshed
= driver
.solve_with_downloads()
430 tasks
.wait_for_blocker(refreshed
)
431 assert driver
.solver
.ready
435 def testReplay(self
):
438 sys
.stdout
= StringIO()
439 getLogger().setLevel(ERROR
)
440 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
441 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
442 with
open('Hello-new.xml', 'rb') as stream
:
443 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, stream
.read(), mtime
+ 10000)
445 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
446 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
447 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
448 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
450 # Update from mirror (should ignore out-of-date timestamp)
451 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
452 tasks
.wait_for_blocker(refreshed
)
454 # Update from upstream (should report an error)
455 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
457 tasks
.wait_for_blocker(refreshed
)
458 raise Exception("Should have been rejected!")
459 except model
.SafeException
as ex
:
460 assert "New feed's modification time is before old version" in str(ex
)
462 # Must finish with the newest version
463 self
.assertEqual(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
467 def testBackground(self
, verbose
= False):
468 r
= Requirements('http://example.com:8000/Hello.xml')
469 d
= Driver(requirements
= r
, config
= self
.config
)
470 self
.import_feed(r
.interface_uri
, 'Hello.xml')
471 self
.config
.freshness
= 0
472 self
.config
.network_use
= model
.network_minimal
473 d
.solver
.solve(r
.interface_uri
, arch
.get_host_architecture())
474 assert d
.solver
.ready
, d
.solver
.get_failure_reason()
477 def choose_download(registed_cb
, nid
, actions
):
479 assert actions
== ['download', 'Download'], actions
480 registed_cb(nid
, 'download')
483 traceback
.print_exc()
488 os
.environ
['DISPLAY'] = 'dummy'
491 sys
.stdout
= StringIO()
492 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
493 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
494 my_dbus
.user_callback
= choose_download
496 with
trapped_exit(1):
497 from zeroinstall
.injector
import config
498 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
499 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
501 background
.spawn_background_update(d
, verbose
)
503 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
508 def testBackgroundVerbose(self
):
509 self
.testBackground(verbose
= True)
511 def testBackgroundApp(self
):
512 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
514 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
518 with
output_suppressed():
519 # Select a version of Hello
520 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
521 r
= Requirements('http://example.com:8000/Hello.xml')
522 driver
= Driver(requirements
= r
, config
= self
.config
)
523 tasks
.wait_for_blocker(driver
.solve_with_downloads())
524 assert driver
.solver
.ready
525 kill_server_process()
528 app
= self
.config
.app_mgr
.create_app('test-app', r
)
529 app
.set_selections(driver
.solver
.selections
)
530 timestamp
= os
.path
.join(app
.path
, 'last-checked')
531 last_check_attempt
= os
.path
.join(app
.path
, 'last-check-attempt')
532 selections_path
= os
.path
.join(app
.path
, 'selections.xml')
534 def reset_timestamps():
536 os
.utime(timestamp
, (1, 1)) # 1970
537 os
.utime(selections_path
, (1, 1))
538 if os
.path
.exists(last_check_attempt
):
539 os
.unlink(last_check_attempt
)
541 # Download the implementation
542 sels
= app
.get_selections()
543 run_server('HelloWorld.tgz')
544 tasks
.wait_for_blocker(app
.download_selections(sels
))
545 kill_server_process()
547 # Not time for a background update yet
548 self
.config
.freshness
= 100
549 dl
= app
.download_selections(sels
)
553 # Trigger a background update - no updates found
555 run_server('Hello.xml')
556 with
trapped_exit(1):
557 dl
= app
.download_selections(sels
)
560 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
561 self
.assertEqual(1, os
.stat(selections_path
).st_mtime
)
562 kill_server_process()
564 # Change the selections
565 sels_path
= os
.path
.join(app
.path
, 'selections.xml')
566 with
open(sels_path
) as stream
:
568 with
open(sels_path
, 'w') as stream
:
569 stream
.write(old
.replace('Hello', 'Goodbye'))
571 # Trigger another background update - metadata changes found
573 run_server('Hello.xml')
574 with
trapped_exit(1):
575 dl
= app
.download_selections(sels
)
578 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
579 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
580 kill_server_process()
582 # Trigger another background update - GUI needed now
584 # Delete cached implementation so we need to download it again
585 stored
= sels
.selections
['http://example.com:8000/Hello.xml'].get_path(self
.config
.stores
)
586 assert os
.path
.basename(stored
).startswith('sha1')
589 # Replace with a valid local feed so we don't have to download immediately
590 with
open(sels_path
, 'w') as stream
:
591 stream
.write(local_hello
)
592 sels
= app
.get_selections()
594 os
.environ
['DISPLAY'] = 'dummy'
596 run_server('Hello.xml')
597 with
trapped_exit(1):
598 dl
= app
.download_selections(sels
)
600 assert ran_gui
# (so doesn't actually update)
601 kill_server_process()
603 # Now again with no DISPLAY
605 del os
.environ
['DISPLAY']
606 run_server('Hello.xml', 'HelloWorld.tgz')
607 with
trapped_exit(1):
608 dl
= app
.download_selections(sels
)
610 assert ran_gui
# (so doesn't actually update)
612 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
613 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
614 kill_server_process()
616 sels
= app
.get_selections()
617 sel
, = sels
.selections
.values()
618 self
.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel
.id)
621 trust
.trust_db
.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
623 os
.environ
['DISPLAY'] = 'dummy'
625 run_server('Hello.xml')
626 with
trapped_exit(1):
627 #import logging; logging.getLogger().setLevel(logging.INFO)
628 dl
= app
.download_selections(sels
)
631 kill_server_process()
633 # Update not triggered because of last-check-attempt
635 os
.utime(timestamp
, (1, 1)) # 1970
636 os
.utime(selections_path
, (1, 1))
637 dl
= app
.download_selections(sels
)
642 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= True)
644 assert dl
._aborted
.happened
645 assert dl
.tempfile
is None
647 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= False)
648 path
= dl
.tempfile
.name
650 assert not os
.path
.exists(path
)
651 assert dl
._aborted
.happened
652 assert dl
.tempfile
is None
654 if __name__
== '__main__':
658 kill_server_process()