e623c37604b5fd93cb1d940bf36a26c6b4382e17
[iotop.git] / iotop / data.py
blobe623c37604b5fd93cb1d940bf36a26c6b4382e17
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
19 # Allow printing with same syntax in Python 2/3
20 from __future__ import print_function
22 import errno
23 import os
24 import pprint
25 import pwd
26 import stat
27 import struct
28 import sys
29 import time
32 # Check for requirements:
33 # o Linux >= 2.6.20 with I/O accounting and VM event counters
36 ioaccounting = os.path.exists('/proc/self/io')
38 try:
39 from iotop.vmstat import VmStat
40 vmstat_f = VmStat()
41 except:
42 vm_event_counters = False
43 else:
44 vm_event_counters = True
46 if not ioaccounting or not vm_event_counters:
47 print('Could not run iotop as some of the requirements are not met:')
48 print('- Linux >= 2.6.20 with')
49 if not ioaccounting:
50 print(' - I/O accounting support ' \
51 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
52 'CONFIG_TASK_IO_ACCOUNTING)')
53 if not vm_event_counters:
54 print(' - VM event counters (CONFIG_VM_EVENT_COUNTERS)')
55 sys.exit(1)
57 from iotop import ioprio, vmstat
58 from iotop.netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
59 from iotop.genetlink import Controller, GeNlMessage
61 class DumpableObject(object):
62 """Base class for all objects that allows easy introspection when printed"""
63 def __repr__(self):
64 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
68 # Interesting fields in a taskstats output
71 class Stats(DumpableObject):
72 members_offsets = [
73 ('blkio_delay_total', 40),
74 ('swapin_delay_total', 56),
75 ('read_bytes', 248),
76 ('write_bytes', 256),
77 ('cancelled_write_bytes', 264)
80 has_blkio_delay_total = False
82 def __init__(self, task_stats_buffer):
83 sd = self.__dict__
84 for name, offset in Stats.members_offsets:
85 data = task_stats_buffer[offset:offset + 8]
86 sd[name] = struct.unpack('Q', data)[0]
88 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
89 # the kernel.
90 if not Stats.has_blkio_delay_total:
91 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
93 def accumulate(self, other_stats, destination, coeff=1):
94 """Update destination from operator(self, other_stats)"""
95 dd = destination.__dict__
96 sd = self.__dict__
97 od = other_stats.__dict__
98 for member, offset in Stats.members_offsets:
99 dd[member] = sd[member] + coeff * od[member]
101 def delta(self, other_stats, destination):
102 """Update destination with self - other_stats"""
103 return self.accumulate(other_stats, destination, coeff=-1)
105 def is_all_zero(self):
106 sd = self.__dict__
107 for name, offset in Stats.members_offsets:
108 if sd[name] != 0:
109 return False
110 return True
112 @staticmethod
113 def build_all_zero():
114 stats = Stats.__new__(Stats)
115 std = stats.__dict__
116 for name, offset in Stats.members_offsets:
117 std[name] = 0
118 return stats
121 # Netlink usage for taskstats
124 TASKSTATS_CMD_GET = 1
125 TASKSTATS_CMD_ATTR_PID = 1
126 TASKSTATS_TYPE_AGGR_PID = 4
127 TASKSTATS_TYPE_PID = 1
128 TASKSTATS_TYPE_STATS = 3
130 class TaskStatsNetlink(object):
131 # Keep in sync with format_stats() and pinfo.did_some_io()
133 def __init__(self, options):
134 self.options = options
135 self.connection = Connection(NETLINK_GENERIC)
136 controller = Controller(self.connection)
137 self.family_id = controller.get_family_id('TASKSTATS')
139 def build_request(self, tid):
140 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
141 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
142 flags=NLM_F_REQUEST)
144 def get_single_task_stats(self, thread):
145 thread.task_stats_request.send(self.connection)
146 try:
147 reply = GeNlMessage.recv(self.connection)
148 except OSError as e:
149 if e.errno == errno.ESRCH:
150 # OSError: Netlink error: No such process (3)
151 return
152 raise
153 for attr_type, attr_value in reply.attrs.items():
154 if attr_type == TASKSTATS_TYPE_AGGR_PID:
155 reply = attr_value.nested()
156 break
157 else:
158 return
159 taskstats_data = reply[TASKSTATS_TYPE_STATS].data
160 if len(taskstats_data) < 272:
161 # Short reply
162 return
163 taskstats_version = struct.unpack('H', taskstats_data[:2])[0]
164 assert taskstats_version >= 4
165 return Stats(taskstats_data)
168 # PIDs manipulations
171 def find_uids(options):
172 """Build options.uids from options.users by resolving usernames to UIDs"""
173 options.uids = []
174 error = False
175 for u in options.users or []:
176 try:
177 uid = int(u)
178 except ValueError:
179 try:
180 passwd = pwd.getpwnam(u)
181 except KeyError:
182 print('Unknown user:', u, file=sys.stderr)
183 error = True
184 else:
185 uid = passwd.pw_uid
186 if not error:
187 options.uids.append(uid)
188 if error:
189 sys.exit(1)
192 def parse_proc_pid_status(pid):
193 result_dict = {}
194 try:
195 for line in open('/proc/%d/status' % pid):
196 key, value = line.split(':\t', 1)
197 result_dict[key] = value.strip()
198 except IOError:
199 pass # No such process
200 return result_dict
203 def safe_utf8_decode(s):
204 try:
205 return s.decode('utf-8')
206 except UnicodeDecodeError:
207 return s.encode('string_escape')
208 except AttributeError:
209 return s
211 class ThreadInfo(DumpableObject):
212 """Stats for a single thread"""
213 def __init__(self, tid, taskstats_connection):
214 self.tid = tid
215 self.mark = True
216 self.stats_total = None
217 self.stats_delta = Stats.__new__(Stats)
218 self.task_stats_request = taskstats_connection.build_request(tid)
220 def get_ioprio(self):
221 return ioprio.get(self.tid)
223 def set_ioprio(self, ioprio_class, ioprio_data):
224 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
225 ioprio_class, ioprio_data)
227 def update_stats(self, stats):
228 if not self.stats_total:
229 self.stats_total = stats
230 stats.delta(self.stats_total, self.stats_delta)
231 self.stats_total = stats
234 class ProcessInfo(DumpableObject):
235 """Stats for a single process (a single line in the output): if
236 options.processes is set, it is a collection of threads, otherwise a single
237 thread."""
238 def __init__(self, pid):
239 self.pid = pid
240 self.uid = None
241 self.user = None
242 self.threads = {} # {tid: ThreadInfo}
243 self.stats_delta = Stats.build_all_zero()
244 self.stats_accum = Stats.build_all_zero()
245 self.stats_accum_timestamp = time.time()
247 def is_monitored(self, options):
248 if (options.pids and not options.processes and
249 self.pid not in options.pids):
250 # We only monitor some threads, not this one
251 return False
253 if options.uids and self.get_uid() not in options.uids:
254 # We only monitor some users, not this one
255 return False
257 return True
259 def get_uid(self):
260 if self.uid:
261 return self.uid
262 # uid in (None, 0) means either we don't know the UID yet or the process
263 # runs as root so it can change its UID. In both cases it means we have
264 # to find out its current UID.
265 try:
266 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
267 except OSError:
268 # The process disappeared
269 uid = None
270 if uid != self.uid:
271 # Maybe the process called setuid()
272 self.user = None
273 self.uid = uid
274 return uid
276 def get_user(self):
277 uid = self.get_uid()
278 if uid is not None and not self.user:
279 try:
280 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
281 except (KeyError, AttributeError):
282 self.user = str(uid)
283 return self.user or '{none}'
285 def get_cmdline(self):
286 # A process may exec, so we must always reread its cmdline
287 try:
288 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
289 cmdline = proc_cmdline.read(4096)
290 except IOError:
291 return '{no such process}'
292 proc_status = parse_proc_pid_status(self.pid)
293 if not cmdline:
294 # Probably a kernel thread, get its name from /proc/PID/status
295 proc_status_name = proc_status.get('Name', '')
296 if proc_status_name:
297 proc_status_name = '[%s]' % proc_status_name
298 else:
299 proc_status_name = '{no name}'
300 return proc_status_name
301 suffix = ''
302 tgid = int(proc_status.get('Tgid', self.pid))
303 if tgid != self.pid:
304 # Not the main thread, maybe it has a custom name
305 tgid_name = parse_proc_pid_status(tgid).get('Name', '')
306 thread_name = proc_status.get('Name', '')
307 if thread_name != tgid_name:
308 suffix += ' [%s]' % thread_name
309 parts = cmdline.split('\0')
310 if parts[0].startswith('/'):
311 first_command_char = parts[0].rfind('/') + 1
312 parts[0] = parts[0][first_command_char:]
313 cmdline = ' '.join(parts).strip()
314 return safe_utf8_decode(cmdline + suffix)
316 def did_some_io(self, accumulated):
317 if accumulated:
318 return not self.stats_accum.is_all_zero()
319 for t in self.threads.values():
320 if not t.stats_delta.is_all_zero():
321 return True
322 return False
324 def get_ioprio(self):
325 priorities = set(t.get_ioprio() for t in self.threads.values())
326 if len(priorities) == 1:
327 return priorities.pop()
328 return '?dif'
330 def set_ioprio(self, ioprio_class, ioprio_data):
331 for thread in self.threads.values():
332 thread.set_ioprio(ioprio_class, ioprio_data)
334 def ioprio_sort_key(self):
335 return ioprio.sort_key(self.get_ioprio())
337 def get_thread(self, tid, taskstats_connection):
338 thread = self.threads.get(tid, None)
339 if not thread:
340 thread = ThreadInfo(tid, taskstats_connection)
341 self.threads[tid] = thread
342 return thread
344 def update_stats(self):
345 stats_delta = Stats.build_all_zero()
346 for tid, thread in self.threads.items():
347 if not thread.mark:
348 stats_delta.accumulate(thread.stats_delta, stats_delta)
349 self.threads = dict([(tid, thread) for tid, thread in
350 self.threads.items() if not thread.mark])
352 nr_threads = len(self.threads)
353 if not nr_threads:
354 return False
356 stats_delta.blkio_delay_total /= nr_threads
357 stats_delta.swapin_delay_total /= nr_threads
359 self.stats_delta = stats_delta
360 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
362 return True
364 class ProcessList(DumpableObject):
365 def __init__(self, taskstats_connection, options):
366 # {pid: ProcessInfo}
367 self.processes = {}
368 self.taskstats_connection = taskstats_connection
369 self.options = options
370 self.timestamp = time.time()
371 self.vmstat = vmstat.VmStat()
373 # A first time as we are interested in the delta
374 self.update_process_counts()
376 def get_process(self, pid):
377 """Either get the specified PID from self.processes or build a new
378 ProcessInfo if we see this PID for the first time"""
379 process = self.processes.get(pid, None)
380 if not process:
381 process = ProcessInfo(pid)
382 self.processes[pid] = process
384 if process.is_monitored(self.options):
385 return process
387 def list_tgids(self):
388 if self.options.pids:
389 return self.options.pids
391 tgids = os.listdir('/proc')
392 if self.options.processes:
393 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
395 tids = []
396 for tgid in tgids:
397 if '0' <= tgid[0] <= '9':
398 try:
399 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
400 except OSError:
401 # The PID went away
402 pass
403 return tids
405 def list_tids(self, tgid):
406 if not self.options.processes:
407 return [tgid]
409 try:
410 tids = list(map(int, os.listdir('/proc/%d/task' % tgid)))
411 except OSError:
412 return []
414 if self.options.pids:
415 tids = list(set(self.options.pids).intersection(set(tids)))
417 return tids
419 def update_process_counts(self):
420 new_timestamp = time.time()
421 self.duration = new_timestamp - self.timestamp
422 self.timestamp = new_timestamp
424 total_read = total_write = 0
426 for tgid in self.list_tgids():
427 process = self.get_process(tgid)
428 if not process:
429 continue
430 for tid in self.list_tids(tgid):
431 thread = process.get_thread(tid, self.taskstats_connection)
432 stats = self.taskstats_connection.get_single_task_stats(thread)
433 if stats:
434 thread.update_stats(stats)
435 delta = thread.stats_delta
436 total_read += delta.read_bytes
437 total_write += delta.write_bytes
438 thread.mark = False
439 return (total_read, total_write), self.vmstat.delta()
441 def refresh_processes(self):
442 for process in self.processes.values():
443 for thread in process.threads.values():
444 thread.mark = True
446 total_read_and_write = self.update_process_counts()
448 self.processes = dict([(pid, process) for pid, process in
449 self.processes.items() if
450 process.update_stats()])
452 return total_read_and_write
454 def clear(self):
455 self.processes = {}