update
[dumbwifi.git] / ui.py
blob2bf635f14d6fcb5705e8f718be056c25f0474ac1
1 #!/usr/bin/env python
3 import curses
4 import os
5 import stat
6 import traceback
8 from conf import config
9 from output import logger
10 import network
11 import system
12 import time
13 import sys
16 def init_routine():
17 as_root_check()
18 init_config()
21 def as_root_check():
22 if not system.as_root():
23 logger.err("Must run as root", log=False)
24 sys.exit(1)
27 def init_config():
28 try:
29 # make sure config file is protected from user access
30 mode = os.stat(config.configfile)[stat.ST_MODE]
31 if mode & (stat.S_IRWXG | stat.S_IRWXO):
32 logger.err(\
33 "Config file %s should only be accessible by root (mode is: %s)"\
34 % (config.configfile, oct(stat.S_IMODE(mode))))
35 sys.exit(1)
37 config.init_config()
38 except Exception, msg:
39 logger.err(msg)
40 sys.exit(1)
43 def tools_check():
44 logger.display("Checking for missing tools...", log=False)
45 for cmd_name in config.tools:
46 var_name = cmd_name.replace("-", "_")
47 path = config.__dict__[var_name]
48 if os.path.isabs(path):
49 logger.display(" [ok] %s" % path, log=False)
50 else:
51 logger.display(" [missing] %s" % cmd_name, log=False)
52 sys.exit(0)
55 def ip_check():
56 logger.await("Checking for ip")
57 ips = [(x, y) for (x, y) in
58 [(i.interface, network.has_ip(i.interface))
59 for i in config.interfaces]
60 if y != None] # (iface, ip) pairs where ip is set
61 if ips:
62 logger.result(
63 reduce(lambda x, y: "%s %s" % (x, y),
64 map(lambda (x, y): "%s (%s)" % (y, x), ips)))
65 return True
66 else:
67 logger.result("none")
70 def choose_medium():
71 logger.await("Selecting preferred network medium")
72 iface = config.interfaces.get_top()
73 logger.result(iface.medium)
74 if iface.medium == "wired": return True
77 def check_wired_link():
78 logger.await("Checking for wired link")
79 ifaces = config.interfaces.get_all(pred=lambda x: x.medium == "wired")
80 connected = [network.wire_connected(i.interface) for i in ifaces \
81 if network.wire_connected(i.interface)]
82 if connected:
83 logger.result(reduce(lambda x,y: "%s %s" % (x,y),
84 map(lambda x: "%s" % x, connected)))
85 return True
86 else:
87 logger.result("none, use wireless")
90 def request_ip(iface, net=None, tries=3):
91 n = iface.interface
92 if net:
93 n = "%s (from %s)" % (n, net.essid)
94 if not network.setup_wifi(iface.interface, net):
95 logger.err("Failed to associate with access point for network %s"\
96 % net.essid)
97 return
99 attempt = 0
100 while attempt < tries:
101 attempt += 1
102 logger.await("(%d/%d) Request ip on %s" % (attempt, tries, n))
104 start_time = time.time()
105 ip = network.request_ip(iface.interface)
106 stop_time = time.time()
107 duration = stop_time - start_time
109 if ip:
110 logger.result("%s (%ds)" % (ip, duration))
111 return True
112 else: logger.result("failed (%ds)" % duration)
115 def scan_wifi_networks(iface, wardrive=None):
116 logger.await("Scan for wireless networks")
117 nets = network.read_scan(network.normal_scan(iface.interface))
118 nets = config.networks.merge(nets, 'essid')
119 f = lambda x: x.priority != None and x.signal != None
120 if wardrive:
121 f = lambda x: x.signal != None and x.priority == None\
122 and x.encrypted == None and x.essid != "<hidden>"
123 nets = nets.sort(sort_key='quality').get_all(pred=f)
124 if nets:
125 logger.result(reduce(lambda x,y: "%s %s" % (x,y),
126 map(lambda x: "%s" % x.essid, nets[:3])))
127 return nets
128 else: logger.result("none")
131 def display_live_networks(nets, field=None, column=None):
132 if not nets:
133 return "No networks detected"
135 keys = ['bssid', 'essid', 'channel', 'bitrate', 'sec', 'signal', 'seen']
136 width = 19; mw = 6; sp = 2 # width: max width mw : min width sp : separate
138 if field:
139 nets = nets.sort(sort_key=field)
140 elif column and 0 < column <= len(keys):
141 nets = nets.sort(sort_key=keys[column-1])
142 else:
143 nets = nets.sort(sort_key='signal')
145 def col_len(dcts, key):
146 "figure out column length to assign to fields based on content length"
147 l = []
148 for dct in dcts:
149 # ugly special case for seen field
150 if key == 'seen':
151 l.append(len('seen'))
152 continue
154 val = dct.get(key)
155 if val:
156 l.append(len(val)+sp)
157 else:
158 l.append(sp)
159 return l
160 ws = [max(min(max(col_len(nets, k)), width), mw) for k in keys]
162 def format_timespan(span):
163 prefix = ''
164 units = ['s', 'm', 'h', 'd', 'w']
165 unit_id = 0
166 if span < 1:
167 prefix = '<'
168 span = 1
169 # minutes
170 if span > 59:
171 span /= 60
172 unit_id += 1
173 # hours
174 if span > 59:
175 span /= 60
176 unit_id += 1
177 # days
178 if span > 23:
179 span /= 24
180 unit_id += 1
181 # weeks
182 if span > 6:
183 span /= 7
184 unit_id += 1
185 span = int(round(span))
186 return "%s%s%s" % (prefix, span, units[unit_id])
188 s = ""
189 for (i, key) in enumerate(keys):
190 s += ("%s" % key[:ws[i]-sp]).ljust(ws[i])
191 s = s[:-sp] + "\n"
192 for net in nets:
193 for (i, key) in enumerate(keys):
194 val = net.get(key)
196 # ugly special case for seen field
197 if key == 'seen':
198 val = "%s" % format_timespan(time.time()-val)
200 if val:
201 s += (val[:ws[i]-sp]).ljust(ws[i])
202 else:
203 s += "".ljust(ws[i])
204 s = s[:-sp] + "\n"
205 return s + "%d network(s) found" % len(nets)
208 def curse(iface, timeout):
209 def curses_init():
210 logger.mute(quiet=True)
212 scr = curses.initscr()
213 curses.noecho()
214 curses.cbreak()
215 scr.keypad(1)
216 curses.curs_set(0)
217 return scr
219 def curses_shutdown(scr):
220 curses.nocbreak()
221 scr.keypad(0)
222 curses.echo()
223 curses.endwin()
225 logger.unmute(quiet=True)
227 def display_list(scr, header, status, list_s):
228 if list_s:
229 ss = list_s.split("\n")
230 scr.clear()
231 try:
232 scr.addstr(header + " / Ctrl+C to exit\n")
233 if status:
234 scr.addstr(status + "\n")
235 scr.addstr("\n"+ss[0]+"\n", curses.A_REVERSE)
237 # add one by one until possible exception is thrown
238 # when the list is too long
239 for s in ss[1:]:
240 scr.addstr(s+"\n")
241 except:
242 pass
243 scr.refresh()
245 def get_status(timeout, t):
246 if timeout > 0:
247 return "-> scanning for %ds (-%ds)" % (timeout, t-time.time())
248 return ""
250 list_s = ""
251 t = time.time()+timeout
252 header = "%s scanning for wireless networks (%s)" % \
253 (config.program_name, iface)
254 status = get_status(timeout, t)
256 try:
257 scr = curses_init()
258 display_list(scr, header, status, display_live_networks(None))
260 while 1:
261 status = get_status(timeout, t)
262 if timeout > 0:
263 if t < time.time(): break
265 scan_data = network.single_scan(iface)
266 nets = network.read_scan(scan_data)
267 list_s = display_live_networks(nets)
269 display_list(scr, header, status, list_s)
271 curses.napms(500)
273 except KeyboardInterrupt:
274 curses_shutdown(scr)
275 except:
276 curses_shutdown(scr)
277 traceback.print_stack()
278 traceback.print_exc()
279 else:
280 curses_shutdown(scr)
282 print list_s