More Python 3 support
[zeroinstall/solver.git] / zeroinstall / apps.py
blob9ff913f76be3b6cdc0dab3e9868e05566a24bec5
1 """
2 Support for managing apps (as created with "0install add").
3 @since: 1.9
4 """
6 # Copyright (C) 2012, Thomas Leonard
7 # See the README file for details, or visit http://0install.net.
9 from zeroinstall import _, SafeException
10 from zeroinstall.support import basedir, portable_rename
11 from zeroinstall.injector import namespaces, selections, qdom
12 from logging import warn, info
13 import re, os, time, tempfile
15 # Avoid characters that are likely to cause problems (reject : and ; everywhere
16 # so that apps can be portable between POSIX and Windows).
17 valid_name = re.compile(r'''^[^./\\:=;'"][^/\\:=;'"]*$''')
19 def validate_name(name):
20 if valid_name.match(name): return
21 raise SafeException("Invalid application name '{name}'".format(name = name))
23 def find_bin_dir(paths = None):
24 """Find the first writable path in the list (default $PATH),
25 skipping /bin, /sbin and everything under /usr except /usr/local/bin"""
26 if paths is None:
27 paths = os.environ['PATH'].split(os.pathsep)
28 for path in paths:
29 if path.startswith('/usr/') and not path.startswith('/usr/local/bin'):
30 # (/usr/local/bin is OK if we're running as root)
31 pass
32 elif path.startswith('/bin') or path.startswith('/sbin'):
33 pass
34 elif os.path.realpath(path).startswith(basedir.xdg_cache_home):
35 pass # print "Skipping cache", first_path
36 elif not os.access(path, os.W_OK):
37 pass # print "No access", first_path
38 else:
39 break
40 else:
41 return None
43 return path
45 _command_template = """#!/bin/sh
46 exec 0install run {app} "$@"
47 """
49 class App:
50 def __init__(self, config, path):
51 self.config = config
52 self.path = path
54 def set_selections(self, sels):
55 """Store a new set of selections. We include today's date in the filename
56 so that we keep a history of previous selections (max one per day), in case
57 we want to to roll back later."""
58 date = time.strftime('%Y-%m-%d')
59 sels_file = os.path.join(self.path, 'selections-{date}.xml'.format(date = date))
60 dom = sels.toDOM()
62 tmp = tempfile.NamedTemporaryFile(prefix = 'selections.xml-', dir = self.path, delete = False, mode = 'wt')
63 try:
64 dom.writexml(tmp, addindent=" ", newl="\n", encoding = 'utf-8')
65 except:
66 tmp.close()
67 os.unlink(tmp.name)
68 raise
69 tmp.close()
70 portable_rename(tmp.name, sels_file)
72 sels_latest = os.path.join(self.path, 'selections.xml')
73 if os.path.exists(sels_latest):
74 os.unlink(sels_latest)
75 os.symlink(os.path.basename(sels_file), sels_latest)
77 self.set_last_checked()
79 def get_selections(self, snapshot_date = None):
80 """Load the selections. Does not check whether they are cached, nor trigger updates.
81 @param snapshot_date: get a historical snapshot
82 @type snapshot_date: (as returned by L{get_history}) | None
83 @return: the selections
84 @rtype: L{selections.Selections}"""
85 if snapshot_date:
86 sels_file = os.path.join(self.path, 'selections-' + snapshot_date + '.xml')
87 else:
88 sels_file = os.path.join(self.path, 'selections.xml')
89 with open(sels_file, 'rb') as stream:
90 return selections.Selections(qdom.parse(stream))
92 def get_history(self):
93 """Get the dates of the available snapshots, starting with the most recent.
94 @rtype: [str]"""
95 date_re = re.compile('selections-(\d\d\d\d-\d\d-\d\d).xml')
96 snapshots = []
97 for f in os.listdir(self.path):
98 match = date_re.match(f)
99 if match:
100 snapshots.append(match.group(1))
101 snapshots.sort(reverse = True)
102 return snapshots
104 def download_selections(self, sels):
105 """Download any missing implementations in the given selections.
106 If no downloads are needed, but we haven't checked for a while, start
107 a background process to check for updates (but return None immediately).
108 @return: a blocker which resolves when all needed implementations are available
109 @rtype: L{tasks.Blocker} | None
111 # Check the selections are still available
112 blocker = sels.download_missing(self.config) # TODO: package impls
114 if blocker:
115 return blocker
116 else:
117 # Nothing to download, but is it time for a background update?
118 timestamp_path = os.path.join(self.path, 'last-checked')
119 try:
120 utime = os.stat(timestamp_path).st_mtime
121 staleness = time.time() - utime
122 info("Staleness of app %s is %d hours", self, staleness / (60 * 60))
123 freshness_threshold = self.config.freshness
124 need_update = freshness_threshold > 0 and staleness >= freshness_threshold
126 if need_update:
127 last_check_attempt_path = os.path.join(self.path, 'last-check-attempt')
128 if os.path.exists(last_check_attempt_path):
129 last_check_attempt = os.stat(last_check_attempt_path).st_mtime
130 if last_check_attempt + 60 * 60 > time.time():
131 info("Tried to check within last hour; not trying again now")
132 need_update = False
133 except Exception as ex:
134 warn("Failed to get time-stamp of %s: %s", timestamp_path, ex)
135 need_update = True
137 if need_update:
138 self.set_last_check_attempt()
139 from zeroinstall.injector import background
140 r = self.get_requirements()
141 background.spawn_background_update2(r, True, self)
143 def set_requirements(self, requirements):
144 import json
145 tmp = tempfile.NamedTemporaryFile(prefix = 'tmp-requirements-', dir = self.path, delete = False, mode = 'wt')
146 try:
147 json.dump(dict((key, getattr(requirements, key)) for key in requirements.__slots__), tmp)
148 except:
149 tmp.close()
150 os.unlink(tmp.name)
151 raise
152 tmp.close()
154 reqs_file = os.path.join(self.path, 'requirements.json')
155 portable_rename(tmp.name, reqs_file)
157 def get_requirements(self):
158 import json
159 from zeroinstall.injector import requirements
160 r = requirements.Requirements(None)
161 reqs_file = os.path.join(self.path, 'requirements.json')
162 with open(reqs_file, 'rt') as stream:
163 values = json.load(stream)
164 for k, v in values.items():
165 setattr(r, k, v)
166 return r
168 def set_last_check_attempt(self):
169 timestamp_path = os.path.join(self.path, 'last-check-attempt')
170 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
171 os.close(fd)
172 os.utime(timestamp_path, None) # In case file already exists
174 def get_last_checked(self):
175 """Get the time of the last successful check for updates.
176 @return: the timestamp (or None on error)
177 @rtype: float | None"""
178 last_updated_path = os.path.join(self.path, 'last-checked')
179 try:
180 return os.stat(last_updated_path).st_mtime
181 except Exception as ex:
182 warn("Failed to get time-stamp of %s: %s", last_updated_path, ex)
183 return None
185 def get_last_check_attempt(self):
186 """Get the time of the last attempted check.
187 @return: the timestamp, or None if we updated successfully.
188 @rtype: float | None"""
189 last_check_attempt_path = os.path.join(self.path, 'last-check-attempt')
190 if os.path.exists(last_check_attempt_path):
191 last_check_attempt = os.stat(last_check_attempt_path).st_mtime
193 last_checked = self.get_last_checked()
195 if last_checked < last_check_attempt:
196 return last_check_attempt
197 return None
199 def set_last_checked(self):
200 timestamp_path = os.path.join(self.path, 'last-checked')
201 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
202 os.close(fd)
203 os.utime(timestamp_path, None) # In case file already exists
205 def destroy(self):
206 # Check for shell command
207 # TODO: remember which commands we own instead of guessing
208 name = self.get_name()
209 bin_dir = find_bin_dir()
210 launcher = os.path.join(bin_dir, name)
211 expanded_template = _command_template.format(app = name)
212 if os.path.exists(launcher) and os.path.getsize(launcher) == len(expanded_template):
213 with open(launcher, 'r') as stream:
214 contents = stream.read()
215 if contents == expanded_template:
216 #print "rm", launcher
217 os.unlink(launcher)
219 # Remove the app itself
220 import shutil
221 shutil.rmtree(self.path)
223 def integrate_shell(self, name):
224 # TODO: remember which commands we create
225 if not valid_name.match(name):
226 raise SafeException("Invalid shell command name '{name}'".format(name = name))
227 bin_dir = find_bin_dir()
228 launcher = os.path.join(bin_dir, name)
229 if os.path.exists(launcher):
230 raise SafeException("Command already exists: {path}".format(path = launcher))
232 with open(launcher, 'w') as stream:
233 stream.write(_command_template.format(app = self.get_name()))
234 # Make new script executable
235 os.chmod(launcher, 0o111 | os.fstat(stream.fileno()).st_mode)
237 def get_name(self):
238 return os.path.basename(self.path)
240 def __str__(self):
241 return '<app ' + self.get_name() + '>'
243 class AppManager:
244 def __init__(self, config):
245 self.config = config
247 def create_app(self, name, requirements):
248 validate_name(name)
250 apps_dir = basedir.save_config_path(namespaces.config_site, "apps")
251 app_dir = os.path.join(apps_dir, name)
252 if os.path.isdir(app_dir):
253 raise SafeException(_("Application '{name}' already exists: {path}").format(name = name, path = app_dir))
254 os.mkdir(app_dir)
256 app = App(self.config, app_dir)
257 app.set_requirements(requirements)
258 app.set_last_checked()
260 return app
262 def lookup_app(self, name, missing_ok = False):
263 """Get the App for name.
264 Returns None if name is not an application (doesn't exist or is not a valid name).
265 Since / and : are not valid name characters, it is generally safe to try this
266 before calling L{injector.model.canonical_iface_uri}."""
267 if not valid_name.match(name):
268 if missing_ok:
269 return None
270 else:
271 raise SafeException("Invalid application name '{name}'".format(name = name))
272 app_dir = basedir.load_first_config(namespaces.config_site, "apps", name)
273 if app_dir:
274 return App(self.config, app_dir)
275 if missing_ok:
276 return None
277 else:
278 raise SafeException("No such application '{name}'".format(name = name))