commit d4cab23b1c8c2f91ae7b353087bc60e7659620ef broke iotop -o
[iotop.git] / iotop / data.py
blob5bb096fc37c05855b6b1e0f5901488f252b6f6a6
1 import errno
2 import glob
3 import os
4 import pprint
5 import pwd
6 import socket
7 import stat
8 import struct
9 import sys
10 import time
13 # Check for requirements:
14 # o Python >= 2.5 for AF_NETLINK sockets
15 # o Linux >= 2.6.20 with I/O accounting
17 try:
18 socket.NETLINK_ROUTE
19 python25 = True
20 except AttributeError:
21 python25 = False
23 ioaccounting = os.path.exists('/proc/self/io')
25 if not python25 or not ioaccounting:
26 def boolean2string(boolean):
27 return boolean and 'Found' or 'Not found'
28 print 'Could not run iotop as some of the requirements are not met:'
29 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
30 print '- Linux >= 2.6.20 with I/O accounting support ' \
31 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
32 'CONFIG_TASK_IO_ACCOUNTING):', \
33 boolean2string(ioaccounting)
34 sys.exit(1)
36 from iotop import ioprio, vmstat
37 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
38 from genetlink import Controller, GeNlMessage
40 class DumpableObject(object):
41 """Base class for all objects that allows easy introspection when printed"""
42 def __repr__(self):
43 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
47 # Interesting fields in a taskstats output
50 class Stats(DumpableObject):
51 members_offsets = [
52 ('blkio_delay_total', 40),
53 ('swapin_delay_total', 56),
54 ('read_bytes', 248),
55 ('write_bytes', 256),
56 ('cancelled_write_bytes', 264)
59 has_blkio_delay_total = False
61 def __init__(self, task_stats_buffer):
62 sd = self.__dict__
63 for name, offset in Stats.members_offsets:
64 data = task_stats_buffer[offset:offset + 8]
65 sd[name] = struct.unpack('Q', data)[0]
67 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
68 # the kernel.
69 if not Stats.has_blkio_delay_total:
70 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
72 def accumulate(self, other_stats, destination, coeff=1):
73 """Update destination from operator(self, other_stats)"""
74 dd = destination.__dict__
75 sd = self.__dict__
76 od = other_stats.__dict__
77 for member, offset in Stats.members_offsets:
78 dd[member] = sd[member] + coeff * od[member]
80 def delta(self, other_stats, destination):
81 """Update destination with self - other_stats"""
82 return self.accumulate(other_stats, destination, coeff=-1)
84 def is_all_zero(self):
85 sd = self.__dict__
86 for name, offset in Stats.members_offsets:
87 if sd[name] != 0:
88 return False
89 return True
91 @staticmethod
92 def build_all_zero():
93 stats = Stats.__new__(Stats)
94 std = stats.__dict__
95 for name, offset in Stats.members_offsets:
96 std[name] = 0
97 return stats
100 # Netlink usage for taskstats
103 TASKSTATS_CMD_GET = 1
104 TASKSTATS_CMD_ATTR_PID = 1
106 class TaskStatsNetlink(object):
107 # Keep in sync with format_stats() and pinfo.did_some_io()
109 def __init__(self, options):
110 self.options = options
111 self.connection = Connection(NETLINK_GENERIC)
112 controller = Controller(self.connection)
113 self.family_id = controller.get_family_id('TASKSTATS')
115 def build_request(self, tid):
116 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
117 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
118 flags=NLM_F_REQUEST)
120 def get_single_task_stats(self, thread):
121 thread.task_stats_request.send(self.connection)
122 try:
123 reply = self.connection.recv()
124 except OSError, e:
125 if e.errno == errno.ESRCH:
126 # OSError: Netlink error: No such process (3)
127 return
128 raise
129 if len(reply.payload) < 292:
130 # Short reply
131 return
132 reply_data = reply.payload[20:]
134 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
135 reply_version = struct.unpack('H', reply.payload[20:22])[0]
136 assert reply_length >= 288
137 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
138 assert reply_version >= 4
139 return Stats(reply_data)
142 # PIDs manipulations
145 def find_uids(options):
146 """Build options.uids from options.users by resolving usernames to UIDs"""
147 options.uids = []
148 error = False
149 for u in options.users or []:
150 try:
151 uid = int(u)
152 except ValueError:
153 try:
154 passwd = pwd.getpwnam(u)
155 except KeyError:
156 print >> sys.stderr, 'Unknown user:', u
157 error = True
158 else:
159 uid = passwd.pw_uid
160 if not error:
161 options.uids.append(uid)
162 if error:
163 sys.exit(1)
165 def safe_utf8_decode(s):
166 try:
167 return s.decode('utf-8')
168 except UnicodeDecodeError:
169 return s.encode('string_escape')
171 class ThreadInfo(DumpableObject):
172 """Stats for a single thread"""
173 def __init__(self, tid, taskstats_connection):
174 self.tid = tid
175 self.mark = True
176 self.stats_total = None
177 self.stats_delta = Stats.__new__(Stats)
178 self.task_stats_request = taskstats_connection.build_request(tid)
180 def get_ioprio(self):
181 return ioprio.get(self.tid)
183 def set_ioprio(self, ioprio_class, ioprio_data):
184 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
185 ioprio_class, ioprio_data)
187 def update_stats(self, stats):
188 if not self.stats_total:
189 self.stats_total = stats
190 stats.delta(self.stats_total, self.stats_delta)
191 self.stats_total = stats
194 class ProcessInfo(DumpableObject):
195 """Stats for a single process (a single line in the output): if
196 options.processes is set, it is a collection of threads, otherwise a single
197 thread."""
198 def __init__(self, pid):
199 self.pid = pid
200 self.uid = None
201 self.user = None
202 self.threads = {} # {tid: ThreadInfo}
203 self.stats_delta = Stats.build_all_zero()
204 self.stats_accum = Stats.build_all_zero()
205 self.stats_accum_timestamp = time.time()
207 def is_monitored(self, options):
208 if (options.pids and not options.processes and
209 self.pid not in options.pids):
210 # We only monitor some threads, not this one
211 return False
213 if options.uids and self.get_uid() not in options.uids:
214 # We only monitor some users, not this one
215 return False
217 return True
219 def get_uid(self):
220 if self.uid:
221 return self.uid
222 # uid in (None, 0) means either we don't know the UID yet or the process
223 # runs as root so it can change its UID. In both cases it means we have
224 # to find out its current UID.
225 try:
226 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
227 except OSError:
228 # The process disappeared
229 uid = None
230 if uid != self.uid:
231 # Maybe the process called setuid()
232 self.user = None
233 self.uid = uid
234 return uid
236 def get_user(self):
237 uid = self.get_uid()
238 if uid is not None and not self.user:
239 try:
240 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
241 except KeyError:
242 self.user = str(uid)
243 return self.user or '{none}'
245 def get_proc_status_name(self):
246 try:
247 proc_status = open('/proc/%d/status' % self.pid)
248 except IOError:
249 return '{no such process}'
250 first_line = proc_status.readline()
251 prefix = 'Name:\t'
252 if first_line.startswith(prefix):
253 name = first_line[6:].strip()
254 else:
255 name = ''
256 if name:
257 name = '[%s]' % name
258 else:
259 name = '{no name}'
260 return name
262 def get_cmdline(self):
263 # A process may exec, so we must always reread its cmdline
264 try:
265 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
266 cmdline = proc_cmdline.read(4096)
267 except IOError:
268 return '{no such process}'
269 if not cmdline:
270 # Probably a kernel thread, get its name from /proc/PID/status
271 return self.get_proc_status_name()
272 parts = cmdline.split('\0')
273 if parts[0].startswith('/'):
274 first_command_char = parts[0].rfind('/') + 1
275 parts[0] = parts[0][first_command_char:]
276 cmdline = ' '.join(parts).strip()
277 return safe_utf8_decode(cmdline)
279 def did_some_io(self, accumulated):
280 if accumulated:
281 return not self.stats_accum.is_all_zero()
282 return not all(t.stats_delta.is_all_zero() for
283 t in self.threads.itervalues())
285 def get_ioprio(self):
286 priorities = set(t.get_ioprio() for t in self.threads.itervalues())
287 if len(priorities) == 1:
288 return priorities.pop()
289 return '?dif'
291 def set_ioprio(self, ioprio_class, ioprio_data):
292 for thread in self.threads.itervalues():
293 thread.set_ioprio(ioprio_class, ioprio_data)
295 def ioprio_sort_key(self):
296 return ioprio.sort_key(self.get_ioprio())
298 def get_thread(self, tid, taskstats_connection):
299 thread = self.threads.get(tid, None)
300 if not thread:
301 thread = ThreadInfo(tid, taskstats_connection)
302 self.threads[tid] = thread
303 return thread
305 def update_stats(self):
306 stats_delta = Stats.build_all_zero()
307 for tid, thread in self.threads.items():
308 if thread.mark:
309 del self.threads[tid]
310 else:
311 stats_delta.accumulate(thread.stats_delta, stats_delta)
313 nr_threads = len(self.threads)
314 if not nr_threads:
315 return False
317 stats_delta.blkio_delay_total /= nr_threads
318 stats_delta.swapin_delay_total /= nr_threads
320 self.stats_delta = stats_delta
321 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
323 return True
325 class ProcessList(DumpableObject):
326 def __init__(self, taskstats_connection, options):
327 # {pid: ProcessInfo}
328 self.processes = {}
329 self.taskstats_connection = taskstats_connection
330 self.options = options
331 self.timestamp = time.time()
332 self.vmstat = vmstat.VmStat()
334 # A first time as we are interested in the delta
335 self.update_process_counts()
337 def get_process(self, pid):
338 """Either get the specified PID from self.processes or build a new
339 ProcessInfo if we see this PID for the first time"""
340 process = self.processes.get(pid, None)
341 if not process:
342 process = ProcessInfo(pid)
343 self.processes[pid] = process
345 if process.is_monitored(self.options):
346 return process
348 def list_tgids(self):
349 if self.options.pids:
350 return self.options.pids
352 tgids = os.listdir('/proc')
353 if self.options.processes:
354 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
356 tids = []
357 for tgid in tgids:
358 if '0' <= tgid[0] <= '9':
359 try:
360 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
361 except OSError:
362 # The PID went away
363 pass
364 return tids
366 def list_tids(self, tgid):
367 if not self.options.processes:
368 return [tgid]
370 try:
371 tids = map(int, os.listdir('/proc/%d/task' % tgid))
372 except OSError:
373 return []
375 if self.options.pids:
376 tids = list(set(self.options.pids).intersection(set(tids)))
378 return tids
380 def update_process_counts(self):
381 new_timestamp = time.time()
382 self.duration = new_timestamp - self.timestamp
383 self.timestamp = new_timestamp
385 for tgid in self.list_tgids():
386 process = self.get_process(tgid)
387 if not process:
388 continue
389 for tid in self.list_tids(tgid):
390 thread = process.get_thread(tid, self.taskstats_connection)
391 stats = self.taskstats_connection.get_single_task_stats(thread)
392 if stats:
393 thread.update_stats(stats)
394 thread.mark = False
396 return self.vmstat.delta()
398 def refresh_processes(self):
399 for process in self.processes.itervalues():
400 for thread in process.threads.itervalues():
401 thread.mark = True
403 total_read_and_write = self.update_process_counts()
405 for pid, process in self.processes.items():
406 if not process.update_stats():
407 del self.processes[pid]
409 return total_read_and_write
411 def clear(self):
412 self.processes = {}