Moved into a sub-dir, so that a svn checkout has the same structure as
[rox-lib/lack.git] / ROX-Lib2 / python / rox / su.py
blobdb8c54e896bc47687aa3bad6eb09893b07048577
1 """The su (switch user) module allows you to execute commands as root.
2 Typical usage:
4 def fn():
5 proxy_maker = SuProxyMaker('I need root access to change /etc/fstab')
6 yield proxy_maker.blocker
7 root = proxy_maker.get_root()
9 call = root.open('/etc/fstab')
10 yield call
11 fd = call.result
13 ...
15 root.finish()
17 tasks.Task(fn())
19 See rox.suchild for a list of operations available on the 'root' object.
20 """
22 import os, sys
23 import rox
24 from rox import master_proxy, tasks
25 import traceback
26 import fcntl
28 _my_dir = os.path.abspath(os.path.dirname(__file__))
29 _child_script = os.path.join(_my_dir, 'suchild.sh')
31 class SuProxyMaker(tasks.Blocker):
32 blocker = None
34 def __init__(self, message):
35 """Displays a box prompting the user for a password.
36 Creates a new master_proxy.MasterObject and starts the child
37 process. The user is prompted for the root
38 password."""
39 to_child = _Pipe()
40 from_child = _Pipe()
42 exec_term = get_term_command()
44 if os.fork() == 0:
45 try:
46 try:
47 to_child.writeable.close()
48 from_child.readable.close()
49 to_parent = from_child.writeable
50 from_parent = to_child.readable
51 fcntl.fcntl(to_parent, fcntl.F_SETFD, 0)
52 fcntl.fcntl(from_parent, fcntl.F_SETFD, 0)
53 import pwd
54 exec_term(message,
55 to_parent,
56 from_parent,
57 pwd.getpwuid(0)[0])
58 except:
59 traceback.print_exc()
60 finally:
61 os._exit(1)
62 from_child.writeable.close()
63 to_child.readable.close()
65 self._root = master_proxy.MasterProxy(to_child.writeable,
66 from_child.readable).root
68 self.blocker = self._root.getuid()
70 def get_root(self):
71 """Raises UserAbort if the user cancels."""
72 try:
73 uid = self.blocker.result
74 except master_proxy.LostConnection:
75 raise rox.UserAbort("Failed to become root (cancelled "
76 "at user's request?)")
77 assert uid == 0
78 self.blocker = None
79 return self._root
81 def get_term_command():
82 def present_in_PATH(command):
83 for x in os.environ['PATH'].split(':'):
84 if os.access(os.path.join(x, command), os.X_OK):
85 return True
86 return False
87 if present_in_PATH('xterm'):
88 return _exec_xterm
89 raise Exception('No suitable terminal emulator could be found. Try installing "xterm"')
91 def _exec_xterm(message, to_parent, from_parent, root_user):
92 os.execlp('xterm', 'xterm',
93 '-geometry', '40x10',
94 '-title', 'Enter password',
95 '-e',
96 _child_script,
97 message,
98 sys.executable,
99 str(to_parent.fileno()),
100 str(from_parent.fileno()),
101 root_user)
103 class _Pipe:
104 """Contains Python file objects for two pipe ends.
105 Wrapping the FDs in this way ensures that they will
106 be freed on error."""
107 readable = None
108 writeable = None
110 def __init__(self):
111 r, w = os.pipe()
112 try:
113 self.readable = os.fdopen(r, 'r')
114 except:
115 os.close(r)
116 raise
117 try:
118 self.writeable = os.fdopen(w, 'w')
119 except:
120 os.close(w)
121 raise