2 Support for managing apps (as created with "0install add").
6 # Copyright (C) 2012, Thomas Leonard
7 # See the README file for details, or visit http://0install.net.
9 from zeroinstall
import _
, SafeException
, logger
10 from zeroinstall
.support
import basedir
, portable_rename
11 from zeroinstall
.injector
import namespaces
, selections
, qdom
12 import re
, os
, time
, tempfile
14 # Avoid characters that are likely to cause problems (reject : and ; everywhere
15 # so that apps can be portable between POSIX and Windows).
16 valid_name
= re
.compile(r
'''^[^./\\:=;'"][^/\\:=;'"]*$''')
18 def validate_name(name
):
19 if valid_name
.match(name
): return
20 raise SafeException("Invalid application name '{name}'".format(name
= name
))
22 def _export(name
, value
):
23 """Try to guess the command to set an environment variable."""
24 shell
= os
.environ
.get('SHELL', '?')
26 return "setenv %s %s" % (name
, value
)
27 return "export %s=%s" % (name
, value
)
29 def find_bin_dir(paths
= None):
30 """Find the first writable path in the list (default $PATH),
31 skipping /bin, /sbin and everything under /usr except /usr/local/bin"""
33 paths
= os
.environ
['PATH'].split(os
.pathsep
)
35 if path
.startswith('/usr/') and not path
.startswith('/usr/local/bin'):
36 # (/usr/local/bin is OK if we're running as root)
38 elif path
.startswith('/bin') or path
.startswith('/sbin'):
40 elif os
.path
.realpath(path
).startswith(basedir
.xdg_cache_home
):
41 pass # print "Skipping cache", first_path
42 elif not os
.access(path
, os
.W_OK
):
43 pass # print "No access", first_path
47 path
= os
.path
.expanduser('~/bin/')
48 logger
.warn('%s is not in $PATH. Add it with:\n%s' % (path
, _export('PATH', path
+ ':$PATH')))
50 if not os
.path
.isdir(path
):
54 _command_template
= """#!/bin/sh
55 exec 0install run {app} "$@"
59 def __init__(self
, config
, path
):
63 def set_selections(self
, sels
):
64 """Store a new set of selections. We include today's date in the filename
65 so that we keep a history of previous selections (max one per day), in case
66 we want to to roll back later."""
67 date
= time
.strftime('%Y-%m-%d')
68 sels_file
= os
.path
.join(self
.path
, 'selections-{date}.xml'.format(date
= date
))
71 tmp
= tempfile
.NamedTemporaryFile(prefix
= 'selections.xml-', dir = self
.path
, delete
= False, mode
= 'wt')
73 dom
.writexml(tmp
, addindent
=" ", newl
="\n", encoding
= 'utf-8')
79 portable_rename(tmp
.name
, sels_file
)
81 sels_latest
= os
.path
.join(self
.path
, 'selections.xml')
82 if os
.path
.exists(sels_latest
):
83 os
.unlink(sels_latest
)
84 os
.symlink(os
.path
.basename(sels_file
), sels_latest
)
86 self
.set_last_checked()
88 def get_selections(self
, snapshot_date
= None):
89 """Load the selections. Does not check whether they are cached, nor trigger updates.
90 @param snapshot_date: get a historical snapshot
91 @type snapshot_date: (as returned by L{get_history}) | None
92 @return: the selections
93 @rtype: L{selections.Selections}"""
95 sels_file
= os
.path
.join(self
.path
, 'selections-' + snapshot_date
+ '.xml')
97 sels_file
= os
.path
.join(self
.path
, 'selections.xml')
98 with
open(sels_file
, 'rb') as stream
:
99 return selections
.Selections(qdom
.parse(stream
))
101 def get_history(self
):
102 """Get the dates of the available snapshots, starting with the most recent.
104 date_re
= re
.compile('selections-(\d\d\d\d-\d\d-\d\d).xml')
106 for f
in os
.listdir(self
.path
):
107 match
= date_re
.match(f
)
109 snapshots
.append(match
.group(1))
110 snapshots
.sort(reverse
= True)
113 def download_selections(self
, sels
):
114 """Download any missing implementations in the given selections.
115 If no downloads are needed, but we haven't checked for a while, start
116 a background process to check for updates (but return None immediately).
117 @return: a blocker which resolves when all needed implementations are available
118 @rtype: L{tasks.Blocker} | None
120 # Check the selections are still available
121 blocker
= sels
.download_missing(self
.config
) # TODO: package impls
126 # Nothing to download, but is it time for a background update?
127 timestamp_path
= os
.path
.join(self
.path
, 'last-checked')
129 utime
= os
.stat(timestamp_path
).st_mtime
130 staleness
= time
.time() - utime
131 logger
.info("Staleness of app %s is %d hours", self
, staleness
/ (60 * 60))
132 freshness_threshold
= self
.config
.freshness
133 need_update
= freshness_threshold
> 0 and staleness
>= freshness_threshold
136 last_check_attempt_path
= os
.path
.join(self
.path
, 'last-check-attempt')
137 if os
.path
.exists(last_check_attempt_path
):
138 last_check_attempt
= os
.stat(last_check_attempt_path
).st_mtime
139 if last_check_attempt
+ 60 * 60 > time
.time():
140 logger
.info("Tried to check within last hour; not trying again now")
142 except Exception as ex
:
143 logger
.warn("Failed to get time-stamp of %s: %s", timestamp_path
, ex
)
147 self
.set_last_check_attempt()
148 from zeroinstall
.injector
import background
149 r
= self
.get_requirements()
150 background
.spawn_background_update2(r
, False, self
)
152 def set_requirements(self
, requirements
):
154 tmp
= tempfile
.NamedTemporaryFile(prefix
= 'tmp-requirements-', dir = self
.path
, delete
= False, mode
= 'wt')
156 json
.dump(dict((key
, getattr(requirements
, key
)) for key
in requirements
.__slots
__), tmp
)
163 reqs_file
= os
.path
.join(self
.path
, 'requirements.json')
164 portable_rename(tmp
.name
, reqs_file
)
166 def get_requirements(self
):
168 from zeroinstall
.injector
import requirements
169 r
= requirements
.Requirements(None)
170 reqs_file
= os
.path
.join(self
.path
, 'requirements.json')
171 with
open(reqs_file
, 'rt') as stream
:
172 values
= json
.load(stream
)
173 for k
, v
in values
.items():
177 def set_last_check_attempt(self
):
178 timestamp_path
= os
.path
.join(self
.path
, 'last-check-attempt')
179 fd
= os
.open(timestamp_path
, os
.O_WRONLY | os
.O_CREAT
, 0o644)
181 os
.utime(timestamp_path
, None) # In case file already exists
183 def get_last_checked(self
):
184 """Get the time of the last successful check for updates.
185 @return: the timestamp (or None on error)
186 @rtype: float | None"""
187 last_updated_path
= os
.path
.join(self
.path
, 'last-checked')
189 return os
.stat(last_updated_path
).st_mtime
190 except Exception as ex
:
191 logger
.warn("Failed to get time-stamp of %s: %s", last_updated_path
, ex
)
194 def get_last_check_attempt(self
):
195 """Get the time of the last attempted check.
196 @return: the timestamp, or None if we updated successfully.
197 @rtype: float | None"""
198 last_check_attempt_path
= os
.path
.join(self
.path
, 'last-check-attempt')
199 if os
.path
.exists(last_check_attempt_path
):
200 last_check_attempt
= os
.stat(last_check_attempt_path
).st_mtime
202 last_checked
= self
.get_last_checked()
204 if last_checked
< last_check_attempt
:
205 return last_check_attempt
208 def set_last_checked(self
):
209 timestamp_path
= os
.path
.join(self
.path
, 'last-checked')
210 fd
= os
.open(timestamp_path
, os
.O_WRONLY | os
.O_CREAT
, 0o644)
212 os
.utime(timestamp_path
, None) # In case file already exists
215 # Check for shell command
216 # TODO: remember which commands we own instead of guessing
217 name
= self
.get_name()
218 bin_dir
= find_bin_dir()
219 launcher
= os
.path
.join(bin_dir
, name
)
220 expanded_template
= _command_template
.format(app
= name
)
221 if os
.path
.exists(launcher
) and os
.path
.getsize(launcher
) == len(expanded_template
):
222 with
open(launcher
, 'r') as stream
:
223 contents
= stream
.read()
224 if contents
== expanded_template
:
225 #print "rm", launcher
228 # Remove the app itself
230 shutil
.rmtree(self
.path
)
232 def integrate_shell(self
, name
):
233 # TODO: remember which commands we create
234 if not valid_name
.match(name
):
235 raise SafeException("Invalid shell command name '{name}'".format(name
= name
))
236 bin_dir
= find_bin_dir()
237 launcher
= os
.path
.join(bin_dir
, name
)
238 if os
.path
.exists(launcher
):
239 raise SafeException("Command already exists: {path}".format(path
= launcher
))
241 with
open(launcher
, 'w') as stream
:
242 stream
.write(_command_template
.format(app
= self
.get_name()))
243 # Make new script executable
244 os
.chmod(launcher
, 0o111 | os
.fstat(stream
.fileno()).st_mode
)
247 return os
.path
.basename(self
.path
)
250 return '<app ' + self
.get_name() + '>'
253 def __init__(self
, config
):
256 def create_app(self
, name
, requirements
):
259 apps_dir
= basedir
.save_config_path(namespaces
.config_site
, "apps")
260 app_dir
= os
.path
.join(apps_dir
, name
)
261 if os
.path
.isdir(app_dir
):
262 raise SafeException(_("Application '{name}' already exists: {path}").format(name
= name
, path
= app_dir
))
265 app
= App(self
.config
, app_dir
)
266 app
.set_requirements(requirements
)
267 app
.set_last_checked()
271 def lookup_app(self
, name
, missing_ok
= False):
272 """Get the App for name.
273 Returns None if name is not an application (doesn't exist or is not a valid name).
274 Since / and : are not valid name characters, it is generally safe to try this
275 before calling L{injector.model.canonical_iface_uri}."""
276 if not valid_name
.match(name
):
280 raise SafeException("Invalid application name '{name}'".format(name
= name
))
281 app_dir
= basedir
.load_first_config(namespaces
.config_site
, "apps", name
)
283 return App(self
.config
, app_dir
)
287 raise SafeException("No such application '{name}'".format(name
= name
))