80 columns
[iotop.git] / iotop / data.py
blob757a5195b4a9cb89be6ceb2959444e531ffa2d11
1 import errno
2 import glob
3 import os
4 import pprint
5 import pwd
6 import socket
7 import stat
8 import struct
9 import sys
10 import time
12 from iotop import ioprio, vmstat
13 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
14 from genetlink import Controller, GeNlMessage
17 # Check for requirements:
18 # o Python >= 2.5 for AF_NETLINK sockets
19 # o Linux >= 2.6.20 with I/O accounting
21 try:
22 socket.NETLINK_ROUTE
23 python25 = True
24 except AttributeError:
25 python25 = False
27 ioaccounting = os.path.exists('/proc/self/io')
29 if not python25 or not ioaccounting:
30 def boolean2string(boolean):
31 return boolean and 'Found' or 'Not found'
32 print 'Could not run iotop as some of the requirements are not met:'
33 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
34 print '- Linux >= 2.6.20 with I/O accounting support ' \
35 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
36 'CONFIG_TASK_IO_ACCOUNTING):', \
37 boolean2string(ioaccounting)
38 sys.exit(1)
40 class DumpableObject(object):
41 """Base class for all objects that allows easy introspection when printed"""
42 def __repr__(self):
43 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
47 # Interesting fields in a taskstats output
50 class Stats(DumpableObject):
51 members_offsets = [
52 ('blkio_delay_total', 40),
53 ('swapin_delay_total', 56),
54 ('read_bytes', 248),
55 ('write_bytes', 256),
56 ('cancelled_write_bytes', 264)
59 has_blkio_delay_total = False
61 def __init__(self, task_stats_buffer):
62 for name, offset in Stats.members_offsets:
63 data = task_stats_buffer[offset:offset + 8]
64 setattr(self, name, struct.unpack('Q', data)[0])
66 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
67 # the kernel.
68 if not Stats.has_blkio_delay_total:
69 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
71 def accumulate(self, other_stats, destination, operator=sum):
72 """Update destination from operator(self, other_stats)"""
73 dd = destination.__dict__
74 sd = self.__dict__
75 od = other_stats.__dict__
76 dd['blkio_delay_total'] = operator((sd['blkio_delay_total'],
77 od['blkio_delay_total']))
78 dd['swapin_delay_total'] = operator((sd['swapin_delay_total'],
79 od['swapin_delay_total']))
80 dd['read_bytes'] = operator((sd['read_bytes'],
81 od['read_bytes']))
82 dd['write_bytes'] = operator((sd['write_bytes'],
83 od['write_bytes']))
84 dd['cancelled_write_bytes'] = operator((sd['cancelled_write_bytes'],
85 od['cancelled_write_bytes']))
87 def delta(self, other_stats, destination):
88 """Update destination with self - other_stats"""
89 def subtract((me, other)):
90 return me - other
91 return self.accumulate(other_stats, destination, operator=subtract)
93 def is_all_zero(self):
94 for name, offset in Stats.members_offsets:
95 if getattr(self, name) != 0:
96 return False
97 return True
99 @staticmethod
100 def build_all_zero():
101 stats = Stats.__new__(Stats)
102 for name, offset in Stats.members_offsets:
103 setattr(stats, name, 0)
104 return stats
107 # Netlink usage for taskstats
110 TASKSTATS_CMD_GET = 1
111 TASKSTATS_CMD_ATTR_PID = 1
113 class TaskStatsNetlink(object):
114 # Keep in sync with format_stats() and pinfo.did_some_io()
116 def __init__(self, options):
117 self.options = options
118 self.connection = Connection(NETLINK_GENERIC)
119 controller = Controller(self.connection)
120 self.family_id = controller.get_family_id('TASKSTATS')
122 def build_request(self, tid):
123 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
124 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
125 flags=NLM_F_REQUEST)
127 def get_single_task_stats(self, thread):
128 thread.task_stats_request.send(self.connection)
129 try:
130 reply = self.connection.recv()
131 except OSError, e:
132 if e.errno == errno.ESRCH:
133 # OSError: Netlink error: No such process (3)
134 return
135 raise
136 if len(reply.payload) < 292:
137 # Short reply
138 return
139 reply_data = reply.payload[20:]
141 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
142 reply_version = struct.unpack('H', reply.payload[20:22])[0]
143 assert reply_length >= 288
144 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
145 assert reply_version >= 4
146 return Stats(reply_data)
149 # PIDs manipulations
152 def find_uids(options):
153 """Build options.uids from options.users by resolving usernames to UIDs"""
154 options.uids = []
155 error = False
156 for u in options.users or []:
157 try:
158 uid = int(u)
159 except ValueError:
160 try:
161 passwd = pwd.getpwnam(u)
162 except KeyError:
163 print >> sys.stderr, 'Unknown user:', u
164 error = True
165 else:
166 uid = passwd.pw_uid
167 if not error:
168 options.uids.append(uid)
169 if error:
170 sys.exit(1)
172 def safe_utf8_decode(s):
173 try:
174 return s.decode('utf-8')
175 except UnicodeDecodeError:
176 return s.encode('string_escape')
178 class ThreadInfo(DumpableObject):
179 """Stats for a single thread"""
180 def __init__(self, tid, taskstats_connection):
181 self.tid = tid
182 self.mark = True
183 self.stats_total = None
184 self.stats_delta = Stats.__new__(Stats)
185 self.task_stats_request = taskstats_connection.build_request(tid)
187 def get_ioprio(self):
188 return ioprio.get(self.tid)
190 def update_stats(self, stats):
191 if not self.stats_total:
192 self.stats_total = stats
193 stats.delta(self.stats_total, self.stats_delta)
194 self.stats_total = stats
197 class ProcessInfo(DumpableObject):
198 """Stats for a single process (a single line in the output): if
199 options.processes is set, it is a collection of threads, otherwise a single
200 thread."""
201 def __init__(self, pid):
202 self.pid = pid
203 self.uid = None
204 self.user = None
205 self.threads = {} # {tid: ThreadInfo}
206 self.stats_delta = Stats.build_all_zero()
207 self.stats_accum = Stats.build_all_zero()
208 self.stats_accum_timestamp = time.time()
210 def is_monitored(self, options):
211 if (options.pids and not options.processes and
212 self.pid not in options.pids):
213 # We only monitor some threads, not this one
214 return False
216 if options.uids and self.get_uid() not in options.uids:
217 # We only monitor some users, not this one
218 return False
220 return True
222 def get_uid(self):
223 if self.uid:
224 return self.uid
225 # uid in (None, 0) means either we don't know the UID yet or the process
226 # runs as root so it can change its UID. In both cases it means we have
227 # to find out its current UID.
228 try:
229 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
230 except OSError:
231 # The process disappeared
232 uid = None
233 if uid != self.uid:
234 # Maybe the process called setuid()
235 self.user = None
236 return uid
238 def get_user(self):
239 uid = self.get_uid()
240 if uid is not None and not self.user:
241 try:
242 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
243 except KeyError:
244 self.user = str(uid)
245 return self.user or '{none}'
247 def get_proc_status_name(self):
248 try:
249 proc_status = open('/proc/%d/status' % self.pid)
250 except IOError:
251 return '{no such process}'
252 first_line = proc_status.readline()
253 prefix = 'Name:\t'
254 if first_line.startswith(prefix):
255 name = first_line[6:].strip()
256 else:
257 name = ''
258 if name:
259 name = '[%s]' % name
260 else:
261 name = '{no name}'
262 return name
264 def get_cmdline(self):
265 # A process may exec, so we must always reread its cmdline
266 try:
267 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
268 cmdline = proc_cmdline.read(4096)
269 except IOError:
270 return '{no such process}'
271 if not cmdline:
272 # Probably a kernel thread, get its name from /proc/PID/status
273 return self.get_proc_status_name()
274 parts = cmdline.split('\0')
275 if parts[0].startswith('/'):
276 first_command_char = parts[0].rfind('/') + 1
277 parts[0] = parts[0][first_command_char:]
278 cmdline = ' '.join(parts).strip()
279 return safe_utf8_decode(cmdline)
281 def did_some_io(self, accumulated):
282 if accumulated:
283 return not self.stats_accum.is_all_zero()
284 return not all(t.stats_delta.is_all_zero() for
285 t in self.threads.itervalues())
287 def get_ioprio(self):
288 priorities = set(t.get_ioprio() for t in self.threads.itervalues())
289 if len(priorities) == 1:
290 return priorities.pop()
291 return '?'
293 def ioprio_sort_key(self):
294 return ioprio.sort_key(self.get_ioprio())
296 def get_thread(self, tid, taskstats_connection):
297 thread = self.threads.get(tid, None)
298 if not thread:
299 thread = ThreadInfo(tid, taskstats_connection)
300 self.threads[tid] = thread
301 return thread
303 def update_stats(self):
304 stats_delta = Stats.build_all_zero()
305 for tid, thread in self.threads.items():
306 if thread.mark:
307 del self.threads[tid]
308 else:
309 stats_delta.accumulate(thread.stats_delta, stats_delta)
311 nr_threads = len(self.threads)
312 if not nr_threads:
313 return False
315 stats_delta.blkio_delay_total /= nr_threads
316 stats_delta.swapin_delay_total /= nr_threads
318 self.stats_delta = stats_delta
319 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
321 return True
323 class ProcessList(DumpableObject):
324 def __init__(self, taskstats_connection, options):
325 # {pid: ProcessInfo}
326 self.processes = {}
327 self.taskstats_connection = taskstats_connection
328 self.options = options
329 self.timestamp = time.time()
330 self.vmstat = vmstat.VmStat()
332 # A first time as we are interested in the delta
333 self.update_process_counts()
335 def get_process(self, pid):
336 """Either get the specified PID from self.processes or build a new
337 ProcessInfo if we see this PID for the first time"""
338 process = self.processes.get(pid, None)
339 if not process:
340 process = ProcessInfo(pid)
341 self.processes[pid] = process
343 if process.is_monitored(self.options):
344 return process
346 def list_tgids(self):
347 if self.options.pids:
348 for pid in self.options.pids:
349 yield pid
351 pattern = '/proc/[0-9]*'
352 if not self.options.processes:
353 pattern += '/task/*'
355 for path in glob.iglob(pattern):
356 yield int(os.path.basename(path))
358 def list_tids(self, tgid):
359 if not self.options.processes:
360 return [tgid]
362 try:
363 tids = map(int, os.listdir('/proc/%d/task' % tgid))
364 except OSError:
365 return []
367 if self.options.pids:
368 tids = list(set(self.options.pids).intersection(set(tids)))
370 return tids
372 def update_process_counts(self):
373 new_timestamp = time.time()
374 self.duration = new_timestamp - self.timestamp
375 self.timestamp = new_timestamp
377 for tgid in self.list_tgids():
378 process = self.get_process(tgid)
379 if not process:
380 continue
381 for tid in self.list_tids(tgid):
382 thread = process.get_thread(tid, self.taskstats_connection)
383 stats = self.taskstats_connection.get_single_task_stats(thread)
384 if stats:
385 thread.update_stats(stats)
386 thread.mark = False
388 return self.vmstat.delta()
390 def refresh_processes(self):
391 for process in self.processes.itervalues():
392 for thread in process.threads.itervalues():
393 thread.mark = True
395 total_read_and_write = self.update_process_counts()
397 for pid, process in self.processes.items():
398 if not process.update_stats():
399 del self.processes[pid]
401 return total_read_and_write
403 def clear(self):
404 self.processes = {}