Put kernel threads between square brackets
[iotop.git] / iotop / data.py
blob95e1e8b6f9032006d6c55c049eb4619c4c91143d
1 import errno
2 import glob
3 import os
4 import pprint
5 import pwd
6 import socket
7 import stat
8 import struct
9 import sys
10 import time
12 from iotop import ioprio, vmstat
13 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
14 from genetlink import Controller, GeNlMessage
17 # Check for requirements:
18 # o Python >= 2.5 for AF_NETLINK sockets
19 # o Linux >= 2.6.20 with I/O accounting
21 try:
22 socket.NETLINK_ROUTE
23 python25 = True
24 except AttributeError:
25 python25 = False
27 ioaccounting = os.path.exists('/proc/self/io')
29 if not python25 or not ioaccounting:
30 def boolean2string(boolean):
31 return boolean and 'Found' or 'Not found'
32 print 'Could not run iotop as some of the requirements are not met:'
33 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
34 print '- Linux >= 2.6.20 with I/O accounting support ' \
35 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
36 'CONFIG_TASK_IO_ACCOUNTING):', \
37 boolean2string(ioaccounting)
38 sys.exit(1)
40 class DumpableObject(object):
41 """Base class for all objects that allows easy introspection when printed"""
42 def __repr__(self):
43 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
47 # Interesting fields in a taskstats output
50 class Stats(DumpableObject):
51 members_offsets = [
52 ('blkio_delay_total', 40),
53 ('swapin_delay_total', 56),
54 ('read_bytes', 248),
55 ('write_bytes', 256),
56 ('cancelled_write_bytes', 264)
59 def __init__(self, task_stats_buffer):
60 for name, offset in Stats.members_offsets:
61 data = task_stats_buffer[offset:offset + 8]
62 setattr(self, name, struct.unpack('Q', data)[0])
64 def accumulate(self, other_stats, operator=sum):
65 """Returns a new Stats object built from operator(self, other_stats)"""
66 delta_stats = Stats.__new__(Stats)
67 for name, offset in Stats.members_offsets:
68 self_value = getattr(self, name)
69 other_value = getattr(other_stats, name)
70 setattr(delta_stats, name, operator((self_value, other_value)))
71 return delta_stats
73 def delta(self, other_stats):
74 """Returns self - other_stats"""
75 def subtract((me, other)):
76 return me - other
77 return self.accumulate(other_stats, operator=subtract)
79 def is_all_zero(self):
80 for name, offset in Stats.members_offsets:
81 if getattr(self, name) != 0:
82 return False
83 return True
85 @staticmethod
86 def build_all_zero():
87 stats = Stats.__new__(Stats)
88 for name, offset in Stats.members_offsets:
89 setattr(stats, name, 0)
90 return stats
93 # Netlink usage for taskstats
96 TASKSTATS_CMD_GET = 1
97 TASKSTATS_CMD_ATTR_PID = 1
99 class TaskStatsNetlink(object):
100 # Keep in sync with human_stats(stats, duration) and pinfo.did_some_io()
102 def __init__(self, options):
103 self.options = options
104 self.connection = Connection(NETLINK_GENERIC)
105 controller = Controller(self.connection)
106 self.family_id = controller.get_family_id('TASKSTATS')
108 def get_single_task_stats(self, pid):
109 request = GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
110 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, pid)],
111 flags=NLM_F_REQUEST)
112 request.send(self.connection)
113 try:
114 reply = self.connection.recv()
115 except OSError, e:
116 if e.errno == errno.ESRCH:
117 # OSError: Netlink error: No such process (3)
118 return
119 raise
120 if len(reply.payload) < 292:
121 # Short reply
122 return
123 reply_data = reply.payload[20:]
125 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
126 reply_version = struct.unpack('H', reply.payload[20:22])[0]
127 assert reply_length >= 288
128 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
129 assert reply_version >= 4
130 return Stats(reply_data)
133 # PIDs manipulations
136 def find_uids(options):
137 """Build options.uids from options.users by resolving usernames to UIDs"""
138 options.uids = []
139 error = False
140 for u in options.users or []:
141 try:
142 uid = int(u)
143 except ValueError:
144 try:
145 passwd = pwd.getpwnam(u)
146 except KeyError:
147 print >> sys.stderr, 'Unknown user:', u
148 error = True
149 else:
150 uid = passwd.pw_uid
151 if not error:
152 options.uids.append(uid)
153 if error:
154 sys.exit(1)
156 def safe_utf8_decode(s):
157 try:
158 return s.decode('utf-8')
159 except UnicodeDecodeError:
160 return s.encode('string_escape')
162 class ThreadInfo(DumpableObject):
163 """Stats for a single thread"""
164 def __init__(self, tid):
165 self.tid = tid
166 self.mark = True
167 self.stats_total = None
168 self.stats_delta = None
170 def get_ioprio(self):
171 return ioprio.get(self.tid)
173 def update_stats(self, stats):
174 if not self.stats_total:
175 self.stats_total = stats
176 self.stats_delta = stats.delta(self.stats_total)
177 self.stats_total = stats
180 class ProcessInfo(DumpableObject):
181 """Stats for a single process (a single line in the output): if
182 options.processes is set, it is a collection of threads, otherwise a single
183 thread."""
184 def __init__(self, pid):
185 self.pid = pid
186 self.uid = None
187 self.user = None
188 self.threads = {} # {tid: ThreadInfo}
189 self.stats_delta = Stats.build_all_zero()
190 self.stats_accum = Stats.build_all_zero()
191 self.stats_accum_timestamp = time.time()
193 def is_monitored(self, options):
194 if (options.pids and not options.processes and
195 self.pid not in options.pids):
196 # We only monitor some threads, not this one
197 return False
199 if options.uids and self.get_uid() not in options.uids:
200 # We only monitor some users, not this one
201 return False
203 return True
205 def get_uid(self):
206 if self.uid:
207 return self.uid
208 # uid in (None, 0) means either we don't know the UID yet or the process
209 # runs as root so it can change its UID. In both cases it means we have
210 # to find out its current UID.
211 try:
212 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
213 except OSError:
214 # The process disappeared
215 uid = None
216 if uid != self.uid:
217 # Maybe the process called setuid()
218 self.user = None
219 return uid
221 def get_user(self):
222 uid = self.get_uid()
223 if uid is not None and not self.user:
224 try:
225 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
226 except KeyError:
227 self.user = str(uid)
228 return self.user or '{none}'
230 def get_proc_status_name(self):
231 try:
232 proc_status = open('/proc/%d/status' % self.pid)
233 except IOError:
234 return '{no such process}'
235 first_line = proc_status.readline()
236 prefix = 'Name:\t'
237 if first_line.startswith(prefix):
238 name = first_line[6:].strip()
239 else:
240 name = ''
241 if name:
242 name = '[%s]' % name
243 else:
244 name = '{no name}'
245 return name
247 def get_cmdline(self):
248 # A process may exec, so we must always reread its cmdline
249 try:
250 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
251 cmdline = proc_cmdline.read(4096)
252 except IOError:
253 return '{no such process}'
254 if not cmdline:
255 # Probably a kernel thread, get its name from /proc/PID/status
256 return self.get_proc_status_name()
257 parts = cmdline.split('\0')
258 if parts[0].startswith('/'):
259 first_command_char = parts[0].rfind('/') + 1
260 parts[0] = parts[0][first_command_char:]
261 cmdline = ' '.join(parts).strip()
262 return safe_utf8_decode(cmdline)
264 def did_some_io(self):
265 return not all(t.stats_delta.is_all_zero() for
266 t in self.threads.itervalues())
268 def get_ioprio(self):
269 priorities = set(t.get_ioprio() for t in self.threads.itervalues())
270 if len(priorities) == 1:
271 return priorities.pop()
272 return '?'
274 def ioprio_sort_key(self):
275 return ioprio.sort_key(self.get_ioprio())
277 def get_thread(self, tid):
278 thread = self.threads.get(tid, None)
279 if not thread:
280 thread = ThreadInfo(tid)
281 self.threads[tid] = thread
282 return thread
284 def update_stats(self):
285 stats_delta = Stats.build_all_zero()
286 for tid, thread in self.threads.items():
287 if thread.mark:
288 del self.threads[tid]
289 else:
290 stats_delta = stats_delta.accumulate(thread.stats_delta)
292 nr_threads = len(self.threads)
293 if not nr_threads:
294 return False
296 stats_delta.blkio_delay_total /= nr_threads
297 stats_delta.swapin_delay_total /= nr_threads
299 self.stats_delta = stats_delta
300 self.stats_accum = self.stats_accum.accumulate(self.stats_delta)
302 return True
304 class ProcessList(DumpableObject):
305 def __init__(self, taskstats_connection, options):
306 # {pid: ProcessInfo}
307 self.processes = {}
308 self.taskstats_connection = taskstats_connection
309 self.options = options
310 self.timestamp = time.time()
311 self.vmstat = vmstat.VmStat()
313 # A first time as we are interested in the delta
314 self.update_process_counts()
316 def get_process(self, pid):
317 """Either get the specified PID from self.processes or build a new
318 ProcessInfo if we see this PID for the first time"""
319 process = self.processes.get(pid, None)
320 if not process:
321 process = ProcessInfo(pid)
322 self.processes[pid] = process
324 if process.is_monitored(self.options):
325 return process
327 def list_tgids(self):
328 if self.options.pids:
329 for pid in self.options.pids:
330 yield pid
332 pattern = '/proc/[0-9]*'
333 if not self.options.processes:
334 pattern += '/task/*'
336 for path in glob.iglob(pattern):
337 yield int(os.path.basename(path))
339 def list_tids(self, tgid):
340 if not self.options.processes:
341 return [tgid]
343 try:
344 tids = map(int, os.listdir('/proc/%d/task' % tgid))
345 except OSError:
346 return []
348 if self.options.pids:
349 tids = list(set(self.options.pids).intersection(set(tids)))
351 return tids
353 def update_process_counts(self):
354 new_timestamp = time.time()
355 self.duration = new_timestamp - self.timestamp
356 self.timestamp = new_timestamp
358 for tgid in self.list_tgids():
359 process = self.get_process(tgid)
360 if not process:
361 continue
362 for tid in self.list_tids(tgid):
363 thread = process.get_thread(tid)
364 stats = self.taskstats_connection.get_single_task_stats(tid)
365 if stats:
366 thread.update_stats(stats)
367 thread.mark = False
369 return self.vmstat.delta()
371 def refresh_processes(self):
372 for process in self.processes.itervalues():
373 for thread in process.threads.itervalues():
374 thread.mark = True
376 total_read_and_write = self.update_process_counts()
378 for pid, process in self.processes.items():
379 if not process.update_stats():
380 del self.processes[pid]
382 return total_read_and_write
384 def clear(self):
385 self.processes = {}