Fix crash when running under python3.
[iotop.git] / iotop / data.py
blob0e43a2c0092c2321d2ee477bacb9f4bfff1b4a90
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
19 # Allow printing with same syntax in Python 2/3
20 from __future__ import print_function
22 import errno
23 import os
24 import pprint
25 import pwd
26 import stat
27 import struct
28 import sys
29 import time
32 # Check for requirements:
33 # o Linux >= 2.6.20 with I/O accounting and VM event counters
34 # o Python >= 2.5 or Python 2.4 + ctypes
37 ioaccounting = os.path.exists('/proc/self/io')
39 try:
40 import ctypes
41 except ImportError:
42 has_ctypes = False
43 else:
44 has_ctypes = True
46 try:
47 from iotop.vmstat import VmStat
48 vmstat_f = VmStat()
49 except:
50 vm_event_counters = False
51 else:
52 vm_event_counters = True
54 if not ioaccounting or not has_ctypes or not vm_event_counters:
55 print('Could not run iotop as some of the requirements are not met:')
56 if not ioaccounting or not vm_event_counters:
57 print('- Linux >= 2.6.20 with')
58 if not ioaccounting:
59 print(' - I/O accounting support ' \
60 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
61 'CONFIG_TASK_IO_ACCOUNTING)')
62 if not vm_event_counters:
63 print(' - VM event counters (CONFIG_VM_EVENT_COUNTERS)')
64 if not has_ctypes:
65 print('- Python >= 2.5 or Python 2.4 with the ctypes module')
67 sys.exit(1)
69 from iotop import ioprio, vmstat
70 from iotop.netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
71 from iotop.genetlink import Controller, GeNlMessage
73 class DumpableObject(object):
74 """Base class for all objects that allows easy introspection when printed"""
75 def __repr__(self):
76 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
80 # Interesting fields in a taskstats output
83 class Stats(DumpableObject):
84 members_offsets = [
85 ('blkio_delay_total', 40),
86 ('swapin_delay_total', 56),
87 ('read_bytes', 248),
88 ('write_bytes', 256),
89 ('cancelled_write_bytes', 264)
92 has_blkio_delay_total = False
94 def __init__(self, task_stats_buffer):
95 sd = self.__dict__
96 for name, offset in Stats.members_offsets:
97 data = task_stats_buffer[offset:offset + 8]
98 sd[name] = struct.unpack('Q', data)[0]
100 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
101 # the kernel.
102 if not Stats.has_blkio_delay_total:
103 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
105 def accumulate(self, other_stats, destination, coeff=1):
106 """Update destination from operator(self, other_stats)"""
107 dd = destination.__dict__
108 sd = self.__dict__
109 od = other_stats.__dict__
110 for member, offset in Stats.members_offsets:
111 dd[member] = sd[member] + coeff * od[member]
113 def delta(self, other_stats, destination):
114 """Update destination with self - other_stats"""
115 return self.accumulate(other_stats, destination, coeff=-1)
117 def is_all_zero(self):
118 sd = self.__dict__
119 for name, offset in Stats.members_offsets:
120 if sd[name] != 0:
121 return False
122 return True
124 @staticmethod
125 def build_all_zero():
126 stats = Stats.__new__(Stats)
127 std = stats.__dict__
128 for name, offset in Stats.members_offsets:
129 std[name] = 0
130 return stats
133 # Netlink usage for taskstats
136 TASKSTATS_CMD_GET = 1
137 TASKSTATS_CMD_ATTR_PID = 1
138 TASKSTATS_TYPE_AGGR_PID = 4
139 TASKSTATS_TYPE_PID = 1
140 TASKSTATS_TYPE_STATS = 3
142 class TaskStatsNetlink(object):
143 # Keep in sync with format_stats() and pinfo.did_some_io()
145 def __init__(self, options):
146 self.options = options
147 self.connection = Connection(NETLINK_GENERIC)
148 controller = Controller(self.connection)
149 self.family_id = controller.get_family_id('TASKSTATS')
151 def build_request(self, tid):
152 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
153 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
154 flags=NLM_F_REQUEST)
156 def get_single_task_stats(self, thread):
157 thread.task_stats_request.send(self.connection)
158 try:
159 reply = GeNlMessage.recv(self.connection)
160 except OSError as e:
161 if e.errno == errno.ESRCH:
162 # OSError: Netlink error: No such process (3)
163 return
164 raise
165 for attr_type, attr_value in reply.attrs.items():
166 if attr_type == TASKSTATS_TYPE_AGGR_PID:
167 reply = attr_value.nested()
168 break
169 else:
170 return
171 taskstats_data = reply[TASKSTATS_TYPE_STATS].data
172 if len(taskstats_data) < 272:
173 # Short reply
174 return
175 taskstats_version = struct.unpack('H', taskstats_data[:2])[0]
176 assert taskstats_version >= 4
177 return Stats(taskstats_data)
180 # PIDs manipulations
183 def find_uids(options):
184 """Build options.uids from options.users by resolving usernames to UIDs"""
185 options.uids = []
186 error = False
187 for u in options.users or []:
188 try:
189 uid = int(u)
190 except ValueError:
191 try:
192 passwd = pwd.getpwnam(u)
193 except KeyError:
194 print('Unknown user:', u, file=sys.stderr)
195 error = True
196 else:
197 uid = passwd.pw_uid
198 if not error:
199 options.uids.append(uid)
200 if error:
201 sys.exit(1)
204 def parse_proc_pid_status(pid):
205 result_dict = {}
206 try:
207 for line in open('/proc/%d/status' % pid):
208 key, value = line.split(':\t', 1)
209 result_dict[key] = value.strip()
210 except IOError:
211 pass # No such process
212 return result_dict
215 def safe_utf8_decode(s):
216 try:
217 return s.decode('utf-8')
218 except UnicodeDecodeError:
219 return s.encode('string_escape')
220 except AttributeError:
221 return s
223 class ThreadInfo(DumpableObject):
224 """Stats for a single thread"""
225 def __init__(self, tid, taskstats_connection):
226 self.tid = tid
227 self.mark = True
228 self.stats_total = None
229 self.stats_delta = Stats.__new__(Stats)
230 self.task_stats_request = taskstats_connection.build_request(tid)
232 def get_ioprio(self):
233 return ioprio.get(self.tid)
235 def set_ioprio(self, ioprio_class, ioprio_data):
236 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
237 ioprio_class, ioprio_data)
239 def update_stats(self, stats):
240 if not self.stats_total:
241 self.stats_total = stats
242 stats.delta(self.stats_total, self.stats_delta)
243 self.stats_total = stats
246 class ProcessInfo(DumpableObject):
247 """Stats for a single process (a single line in the output): if
248 options.processes is set, it is a collection of threads, otherwise a single
249 thread."""
250 def __init__(self, pid):
251 self.pid = pid
252 self.uid = None
253 self.user = None
254 self.threads = {} # {tid: ThreadInfo}
255 self.stats_delta = Stats.build_all_zero()
256 self.stats_accum = Stats.build_all_zero()
257 self.stats_accum_timestamp = time.time()
259 def is_monitored(self, options):
260 if (options.pids and not options.processes and
261 self.pid not in options.pids):
262 # We only monitor some threads, not this one
263 return False
265 if options.uids and self.get_uid() not in options.uids:
266 # We only monitor some users, not this one
267 return False
269 return True
271 def get_uid(self):
272 if self.uid:
273 return self.uid
274 # uid in (None, 0) means either we don't know the UID yet or the process
275 # runs as root so it can change its UID. In both cases it means we have
276 # to find out its current UID.
277 try:
278 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
279 except OSError:
280 # The process disappeared
281 uid = None
282 if uid != self.uid:
283 # Maybe the process called setuid()
284 self.user = None
285 self.uid = uid
286 return uid
288 def get_user(self):
289 uid = self.get_uid()
290 if uid is not None and not self.user:
291 try:
292 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
293 except (KeyError, AttributeError):
294 self.user = str(uid)
295 return self.user or '{none}'
297 def get_cmdline(self):
298 # A process may exec, so we must always reread its cmdline
299 try:
300 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
301 cmdline = proc_cmdline.read(4096)
302 except IOError:
303 return '{no such process}'
304 proc_status = parse_proc_pid_status(self.pid)
305 if not cmdline:
306 # Probably a kernel thread, get its name from /proc/PID/status
307 proc_status_name = proc_status.get('Name', '')
308 if proc_status_name:
309 proc_status_name = '[%s]' % proc_status_name
310 else:
311 proc_status_name = '{no name}'
312 return proc_status_name
313 suffix = ''
314 tgid = int(proc_status.get('Tgid', self.pid))
315 if tgid != self.pid:
316 # Not the main thread, maybe it has a custom name
317 tgid_name = parse_proc_pid_status(tgid).get('Name', '')
318 thread_name = proc_status.get('Name', '')
319 if thread_name != tgid_name:
320 suffix += ' [%s]' % thread_name
321 parts = cmdline.split('\0')
322 if parts[0].startswith('/'):
323 first_command_char = parts[0].rfind('/') + 1
324 parts[0] = parts[0][first_command_char:]
325 cmdline = ' '.join(parts).strip()
326 return safe_utf8_decode(cmdline + suffix)
328 def did_some_io(self, accumulated):
329 if accumulated:
330 return not self.stats_accum.is_all_zero()
331 for t in self.threads.values():
332 if not t.stats_delta.is_all_zero():
333 return True
334 return False
336 def get_ioprio(self):
337 priorities = set(t.get_ioprio() for t in self.threads.values())
338 if len(priorities) == 1:
339 return priorities.pop()
340 return '?dif'
342 def set_ioprio(self, ioprio_class, ioprio_data):
343 for thread in self.threads.values():
344 thread.set_ioprio(ioprio_class, ioprio_data)
346 def ioprio_sort_key(self):
347 return ioprio.sort_key(self.get_ioprio())
349 def get_thread(self, tid, taskstats_connection):
350 thread = self.threads.get(tid, None)
351 if not thread:
352 thread = ThreadInfo(tid, taskstats_connection)
353 self.threads[tid] = thread
354 return thread
356 def update_stats(self):
357 stats_delta = Stats.build_all_zero()
358 for tid, thread in self.threads.items():
359 if not thread.mark:
360 stats_delta.accumulate(thread.stats_delta, stats_delta)
361 self.threads = dict([(tid, thread) for tid, thread in
362 self.threads.items() if not thread.mark])
364 nr_threads = len(self.threads)
365 if not nr_threads:
366 return False
368 stats_delta.blkio_delay_total /= nr_threads
369 stats_delta.swapin_delay_total /= nr_threads
371 self.stats_delta = stats_delta
372 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
374 return True
376 class ProcessList(DumpableObject):
377 def __init__(self, taskstats_connection, options):
378 # {pid: ProcessInfo}
379 self.processes = {}
380 self.taskstats_connection = taskstats_connection
381 self.options = options
382 self.timestamp = time.time()
383 self.vmstat = vmstat.VmStat()
385 # A first time as we are interested in the delta
386 self.update_process_counts()
388 def get_process(self, pid):
389 """Either get the specified PID from self.processes or build a new
390 ProcessInfo if we see this PID for the first time"""
391 process = self.processes.get(pid, None)
392 if not process:
393 process = ProcessInfo(pid)
394 self.processes[pid] = process
396 if process.is_monitored(self.options):
397 return process
399 def list_tgids(self):
400 if self.options.pids:
401 return self.options.pids
403 tgids = os.listdir('/proc')
404 if self.options.processes:
405 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
407 tids = []
408 for tgid in tgids:
409 if '0' <= tgid[0] <= '9':
410 try:
411 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
412 except OSError:
413 # The PID went away
414 pass
415 return tids
417 def list_tids(self, tgid):
418 if not self.options.processes:
419 return [tgid]
421 try:
422 tids = list(map(int, os.listdir('/proc/%d/task' % tgid)))
423 except OSError:
424 return []
426 if self.options.pids:
427 tids = list(set(self.options.pids).intersection(set(tids)))
429 return tids
431 def update_process_counts(self):
432 new_timestamp = time.time()
433 self.duration = new_timestamp - self.timestamp
434 self.timestamp = new_timestamp
436 for tgid in self.list_tgids():
437 process = self.get_process(tgid)
438 if not process:
439 continue
440 for tid in self.list_tids(tgid):
441 thread = process.get_thread(tid, self.taskstats_connection)
442 stats = self.taskstats_connection.get_single_task_stats(thread)
443 if stats:
444 thread.update_stats(stats)
445 thread.mark = False
447 return self.vmstat.delta()
449 def refresh_processes(self):
450 for process in self.processes.values():
451 for thread in process.threads.values():
452 thread.mark = True
454 total_read_and_write = self.update_process_counts()
456 self.processes = dict([(pid, process) for pid, process in
457 self.processes.items() if
458 process.update_stats()])
460 return total_read_and_write
462 def clear(self):
463 self.processes = {}