Don't crash when a thread just disappeared
[iotop.git] / iotop / data.py
blob00b43a1c11bf36b80f195dc35d0e275d83307786
1 import errno
2 import os
3 import pprint
4 import pwd
5 import socket
6 import stat
7 import struct
8 import sys
9 import time
11 from iotop import ioprio
12 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
13 from genetlink import Controller, GeNlMessage
16 # Check for requirements:
17 # o Python >= 2.5 for AF_NETLINK sockets
18 # o Linux >= 2.6.20 with I/O accounting
20 try:
21 socket.NETLINK_ROUTE
22 python25 = True
23 except AttributeError:
24 python25 = False
26 ioaccounting = os.path.exists('/proc/self/io')
28 if not python25 or not ioaccounting:
29 def boolean2string(boolean):
30 return boolean and 'Found' or 'Not found'
31 print 'Could not run iotop as some of the requirements are not met:'
32 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
33 print '- Linux >= 2.6.20 with I/O accounting support ' \
34 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
35 'CONFIG_TASK_IO_ACCOUNTING):', \
36 boolean2string(ioaccounting)
37 sys.exit(1)
39 class DumpableObject(object):
40 def __repr__(self):
41 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
44 # Interesting fields in a taskstats output
47 class Stats(DumpableObject):
48 members_offsets = [
49 ('blkio_delay_total', 40),
50 ('swapin_delay_total', 56),
51 ('read_bytes', 248),
52 ('write_bytes', 256),
53 ('cancelled_write_bytes', 264)
56 def __init__(self, task_stats_buffer):
57 for name, offset in Stats.members_offsets:
58 data = task_stats_buffer[offset:offset + 8]
59 setattr(self, name, struct.unpack('Q', data)[0])
61 def accumulate(self, other_stats, operator=sum):
62 delta_stats = Stats.__new__(Stats)
63 for name, offset in Stats.members_offsets:
64 self_value = getattr(self, name)
65 other_value = getattr(other_stats, name)
66 setattr(delta_stats, name, operator((self_value, other_value)))
67 return delta_stats
69 def delta(self, other_stats):
70 def subtract((me, other)):
71 return me - other
72 return self.accumulate(other_stats, operator=subtract)
74 def is_all_zero(self):
75 for name, offset in Stats.members_offsets:
76 if getattr(self, name) != 0:
77 return False
78 return True
80 @staticmethod
81 def build_all_zero():
82 stats = Stats.__new__(Stats)
83 for name, offset in Stats.members_offsets:
84 setattr(stats, name, 0)
85 return stats
88 # Netlink usage for taskstats
91 TASKSTATS_CMD_GET = 1
92 TASKSTATS_CMD_ATTR_PID = 1
94 class TaskStatsNetlink(object):
95 # Keep in sync with human_stats(stats, duration) and pinfo.did_some_io()
97 def __init__(self, options):
98 self.options = options
99 self.connection = Connection(NETLINK_GENERIC)
100 controller = Controller(self.connection)
101 self.family_id = controller.get_family_id('TASKSTATS')
103 def get_single_task_stats(self, pid):
104 request = GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
105 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, pid)],
106 flags=NLM_F_REQUEST)
107 request.send(self.connection)
108 try:
109 reply = self.connection.recv()
110 except OSError, e:
111 if e.errno == errno.ESRCH:
112 # OSError: Netlink error: No such process (3)
113 return
114 raise
115 if len(reply.payload) < 292:
116 # Short reply
117 return
118 reply_data = reply.payload[20:]
120 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
121 reply_version = struct.unpack('H', reply.payload[20:22])[0]
122 assert reply_length >= 288
123 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
124 assert reply_version >= 4
125 return Stats(reply_data)
127 def get_task_stats(self, pid):
128 if self.options.processes:
129 # We don't use TASKSTATS_CMD_ATTR_TGID as it's only half
130 # implemented in the kernel
131 try:
132 pids = map(int, os.listdir('/proc/%d/task' % pid))
133 except OSError:
134 # Pid not found
135 pids = []
136 else:
137 pids = [pid]
139 stats_list = map(self.get_single_task_stats, pids)
140 stats_list = filter(bool, stats_list)
141 if stats_list:
142 res = stats_list[0]
143 for stats in stats_list[1:]:
144 res = res.accumulate(stats)
145 nr_stats = len(stats_list)
146 res.blkio_delay_total /= nr_stats
147 res.swapin_delay_total /= nr_stats
148 return res
151 # PIDs manipulations
154 def find_uids(options):
155 options.uids = []
156 error = False
157 for u in options.users or []:
158 try:
159 uid = int(u)
160 except ValueError:
161 try:
162 passwd = pwd.getpwnam(u)
163 except KeyError:
164 print >> sys.stderr, 'Unknown user:', u
165 error = True
166 else:
167 uid = passwd.pw_uid
168 if not error:
169 options.uids.append(uid)
170 if error:
171 sys.exit(1)
173 def safe_utf8_decode(s):
174 try:
175 return s.decode('utf-8')
176 except UnicodeDecodeError:
177 return s.encode('string_escape')
179 class pinfo(DumpableObject):
180 def __init__(self, pid, options):
181 self.mark = True
182 self.pid = pid
183 self.uid = None
184 self.user = None
185 self.stats_total = Stats.build_all_zero()
186 self.stats_delta = Stats.build_all_zero()
188 def is_monitored(self, options):
189 if (options.pids and not options.processes and
190 self.pid not in options.pids):
191 # We only monitor some threads, not this one
192 return False
194 if options.uids and self.get_uid() not in options.uids:
195 # We only monitor some users, not this one
196 return False
198 return True
200 def get_uid(self):
201 if self.uid:
202 return self.uid
203 # uid in (None, 0) means either we don't know the UID yet or the process
204 # runs as root so it can change its UID. In both cases it means we have
205 # to find out its current UID.
206 try:
207 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
208 except OSError:
209 # The process disappeared
210 uid = None
211 if uid != self.uid:
212 # Maybe the process called setuid()
213 self.user = None
214 return uid
216 def get_user(self):
217 uid = self.get_uid()
218 if uid is not None and not self.user:
219 try:
220 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
221 except KeyError:
222 self.user = str(uid)
223 return self.user or '{none}'
225 def get_proc_status_name(self):
226 try:
227 proc_status = open('/proc/%d/status' % self.pid)
228 except IOError:
229 return '{no such process}'
230 first_line = proc_status.readline()
231 prefix = 'Name:\t'
232 if first_line.startswith(prefix):
233 name = first_line[6:].strip()
234 else:
235 name = ''
236 return name or '{no name}'
238 def get_cmdline(self):
239 # A process may exec, so we must always reread its cmdline
240 try:
241 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
242 cmdline = proc_cmdline.read(4096)
243 except IOError:
244 return '{no such process}'
245 if not cmdline:
246 # Probably a kernel thread, get its name from /proc/PID/status
247 return self.get_proc_status_name()
248 parts = cmdline.split('\0')
249 if parts[0].startswith('/'):
250 first_command_char = parts[0].rfind('/') + 1
251 parts[0] = parts[0][first_command_char:]
252 cmdline = ' '.join(parts).strip()
253 return safe_utf8_decode(cmdline)
255 def add_stats(self, stats):
256 self.stats_timestamp = time.time()
257 self.stats_delta = stats.delta(self.stats_total)
258 self.stats_total = stats
259 self.ioprio = ioprio.get(self.pid)
261 def did_some_io(self):
262 return not self.stats_delta.is_all_zero()
264 def ioprio_sort_key(self):
265 return ioprio.sort_key(self.ioprio)
267 class ProcessList(DumpableObject):
268 def __init__(self, taskstats_connection, options):
269 # {pid: pinfo}
270 self.processes = {}
271 self.taskstats_connection = taskstats_connection
272 self.options = options
273 self.timestamp = time.time()
275 # A first time as we are interested in the delta
276 self.update_process_counts()
278 def get_process(self, pid):
279 process = self.processes.get(pid, None)
280 if not process:
281 try:
282 process = pinfo(pid, self.options)
283 except IOError:
284 # IOError: [Errno 2] No such file or directory: '/proc/...'
285 return
286 if not process.is_monitored(self.options):
287 return
288 self.processes[pid] = process
289 return process
291 def list_pids(self, tgid):
292 if self.options.processes or self.options.pids:
293 return [tgid]
294 try:
295 return map(int, os.listdir('/proc/%d/task' % tgid))
296 except OSError:
297 return []
299 def update_process_counts(self):
300 new_timestamp = time.time()
301 self.duration = new_timestamp - self.timestamp
302 self.timestamp = new_timestamp
303 total_read = total_write = 0
304 tgids = self.options.pids or [int(tgid) for tgid in os.listdir('/proc')
305 if '0' <= tgid[0] and tgid[0] <= '9']
306 for tgid in tgids:
307 for pid in self.list_pids(tgid):
308 process = self.get_process(pid)
309 if process:
310 stats = self.taskstats_connection.get_task_stats(pid)
311 if stats:
312 process.mark = False
313 process.add_stats(stats)
314 delta = process.stats_delta
315 total_read += delta.read_bytes
316 total_write += delta.write_bytes
317 return total_read, total_write
319 def refresh_processes(self):
320 for process in self.processes.values():
321 process.mark = True
322 total_read_and_write = self.update_process_counts()
323 for pid, process in self.processes.items():
324 if process.mark:
325 del self.processes[pid]
326 return total_read_and_write