Also handle invalid UTF-8
[iotop.git] / iotop / data.py
blob0c480c75d9034c80544ae3a5e54e533d05e9791e
1 import errno
2 import os
3 import pwd
4 import socket
5 import struct
6 import sys
7 import time
9 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
10 from genetlink import Controller, GeNlMessage
13 # Check for requirements:
14 # o Python >= 2.5 for AF_NETLINK sockets
15 # o Linux >= 2.6.20 with I/O accounting
17 try:
18 socket.NETLINK_ROUTE
19 python25 = True
20 except AttributeError:
21 python25 = False
23 ioaccounting = os.path.exists('/proc/self/io')
25 if not python25 or not ioaccounting:
26 def boolean2string(boolean):
27 return boolean and 'Found' or 'Not found'
28 print 'Could not run iotop as some of the requirements are not met:'
29 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
30 print '- Linux >= 2.6.20 with I/O accounting support:', \
31 boolean2string(ioaccounting)
32 sys.exit(1)
35 # Netlink usage for taskstats
38 TASKSTATS_CMD_GET = 1
39 TASKSTATS_CMD_ATTR_PID = 1
40 TASKSTATS_CMD_ATTR_TGID = 2
42 class TaskStatsNetlink(object):
43 # Keep in sync with human_stats(stats, duration) and pinfo.did_some_io()
44 members_offsets = [
45 ('blkio_delay_total', 40),
46 ('swapin_delay_total', 56),
47 ('read_bytes', 248),
48 ('write_bytes', 256),
49 ('cancelled_write_bytes', 264)
52 def __init__(self, options):
53 self.options = options
54 self.connection = Connection(NETLINK_GENERIC)
55 controller = Controller(self.connection)
56 self.family_id = controller.get_family_id('TASKSTATS')
58 def get_task_stats(self, pid):
59 if self.options.processes:
60 attr = TASKSTATS_CMD_ATTR_TGID
61 else:
62 attr = TASKSTATS_CMD_ATTR_PID
63 request = GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
64 attrs=[U32Attr(attr, pid)],
65 flags=NLM_F_REQUEST)
66 request.send(self.connection)
67 try:
68 reply = self.connection.recv()
69 except OSError, e:
70 if e.errno == errno.ESRCH:
71 # OSError: Netlink error: No such process (3)
72 return
73 raise
74 if len(reply.payload) < 292:
75 # Short reply
76 return
77 reply_data = reply.payload[20:]
79 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
80 reply_version = struct.unpack('H', reply.payload[20:22])[0]
81 assert reply_length >= 288
82 assert reply_type == attr + 3
83 assert reply_version >= 4
85 res = {}
86 for name, offset in TaskStatsNetlink.members_offsets:
87 data = reply_data[offset: offset + 8]
88 res[name] = struct.unpack('Q', data)[0]
90 return res
93 # PIDs manipulations
96 def find_uids(options):
97 options.uids = []
98 error = False
99 for u in options.users or []:
100 try:
101 uid = int(u)
102 except ValueError:
103 try:
104 passwd = pwd.getpwnam(u)
105 except KeyError:
106 print >> sys.stderr, 'Unknown user:', u
107 error = True
108 else:
109 uid = passwd.pw_uid
110 if not error:
111 options.uids.append(uid)
112 if error:
113 sys.exit(1)
115 def safe_utf8_decode(s):
116 try:
117 return s.decode('utf-8')
118 except UnicodeDecodeError:
119 return s.encode('string_escape')
121 class pinfo(object):
122 def __init__(self, pid, options):
123 self.mark = False
124 self.pid = pid
125 self.stats = {}
126 for name, offset in TaskStatsNetlink.members_offsets:
127 self.stats[name] = (0, 0) # Total, Delta
128 self.parse_status('/proc/%d/status' % pid, options)
130 def check_if_valid(self, uid, options):
131 self.valid = options.pids or not options.uids or uid in options.uids
133 def parse_status(self, path, options):
134 for line in open(path):
135 if line.startswith('Name:'):
136 # Name kernel threads
137 self.name = '[' + line.split()[1].strip() + ']'
138 elif line.startswith('Uid:'):
139 uid = int(line.split()[1])
140 # We check monitored PIDs only here
141 self.check_if_valid(uid, options)
142 try:
143 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
144 except KeyError:
145 self.user = str(uid)
146 break
148 def add_stats(self, stats):
149 self.stats_timestamp = time.time()
150 for name, value in stats.iteritems():
151 prev_value = self.stats[name][0]
152 self.stats[name] = (value, value - prev_value)
154 def get_cmdline(self):
155 # A process may exec, so we must always reread its cmdline
156 try:
157 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
158 cmdline = proc_cmdline.read(4096)
159 except IOError:
160 return '{no such process}'
161 parts = cmdline.split('\0')
162 if parts[0].startswith('/'):
163 first_command_char = parts[0].rfind('/') + 1
164 parts[0] = parts[0][first_command_char:]
165 cmdline = ' '.join(parts).strip()
166 return safe_utf8_decode(cmdline or self.name)
168 def did_some_io(self):
169 for name in self.stats:
170 if self.stats[name][1]:
171 return True
173 return False
175 class ProcessList(object):
176 def __init__(self, taskstats_connection, options):
177 # {pid: pinfo}
178 self.processes = {}
179 self.taskstats_connection = taskstats_connection
180 self.options = options
181 self.timestamp = time.time()
183 # A first time as we are interested in the delta
184 self.update_process_counts()
186 def get_process(self, pid):
187 process = self.processes.get(pid, None)
188 if not process:
189 try:
190 process = pinfo(pid, self.options)
191 except IOError:
192 # IOError: [Errno 2] No such file or directory: '/proc/...'
193 return
194 if not process.valid:
195 return
196 self.processes[pid] = process
197 return process
199 def list_pids(self, tgid):
200 if self.options.processes or self.options.pids:
201 return [tgid]
202 try:
203 return map(int, os.listdir('/proc/%d/task' % tgid))
204 except OSError:
205 return []
207 def update_process_counts(self):
208 new_timestamp = time.time()
209 self.duration = new_timestamp - self.timestamp
210 self.timestamp = new_timestamp
211 total_read = total_write = 0
212 tgids = self.options.pids or [int(tgid) for tgid in os.listdir('/proc')
213 if '0' <= tgid[0] and tgid[0] <= '9']
214 for tgid in tgids:
215 for pid in self.list_pids(tgid):
216 process = self.get_process(pid)
217 if process:
218 stats = self.taskstats_connection.get_task_stats(pid)
219 if stats:
220 process.mark = False
221 process.add_stats(stats)
222 total_read += process.stats['read_bytes'][1]
223 total_write += process.stats['write_bytes'][1]
224 return total_read, total_write
226 def refresh_processes(self):
227 for process in self.processes.values():
228 process.mark = True
229 total_read_and_write = self.update_process_counts()
230 to_delete = []
231 for pid, process in self.processes.iteritems():
232 if process.mark:
233 to_delete.append(pid)
234 for pid in to_delete:
235 del self.processes[pid]
236 return total_read_and_write