Some missed python3 conversions
[iotop.git] / iotop / data.py
blob8e5c70d2d8e8b5b4627f291d40bebfbd1b7e7a1a
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
19 import errno
20 import os
21 import pprint
22 import pwd
23 import stat
24 import struct
25 import sys
26 import time
29 # Check for requirements:
30 # o Linux >= 2.6.20 with I/O accounting and VM event counters
31 # o Python >= 2.5 or Python 2.4 + ctypes
34 ioaccounting = os.path.exists('/proc/self/io')
36 try:
37 import ctypes
38 except ImportError:
39 has_ctypes = False
40 else:
41 has_ctypes = True
43 try:
44 from iotop.vmstat import VmStat
45 vmstat_f = VmStat()
46 except:
47 vm_event_counters = False
48 else:
49 vm_event_counters = True
51 if not ioaccounting or not has_ctypes or not vm_event_counters:
52 print('Could not run iotop as some of the requirements are not met:')
53 if not ioaccounting or not vm_event_counters:
54 print('- Linux >= 2.6.20 with')
55 if not ioaccounting:
56 print(' - I/O accounting support ' \
57 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
58 'CONFIG_TASK_IO_ACCOUNTING)')
59 if not vm_event_counters:
60 print(' - VM event counters (CONFIG_VM_EVENT_COUNTERS)')
61 if not has_ctypes:
62 print('- Python >= 2.5 or Python 2.4 with the ctypes module')
64 sys.exit(1)
66 from iotop import ioprio, vmstat
67 from iotop.netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
68 from iotop.genetlink import Controller, GeNlMessage
70 class DumpableObject(object):
71 """Base class for all objects that allows easy introspection when printed"""
72 def __repr__(self):
73 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
77 # Interesting fields in a taskstats output
80 class Stats(DumpableObject):
81 members_offsets = [
82 ('blkio_delay_total', 40),
83 ('swapin_delay_total', 56),
84 ('read_bytes', 248),
85 ('write_bytes', 256),
86 ('cancelled_write_bytes', 264)
89 has_blkio_delay_total = False
91 def __init__(self, task_stats_buffer):
92 sd = self.__dict__
93 for name, offset in Stats.members_offsets:
94 data = task_stats_buffer[offset:offset + 8]
95 sd[name] = struct.unpack('Q', data)[0]
97 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
98 # the kernel.
99 if not Stats.has_blkio_delay_total:
100 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
102 def accumulate(self, other_stats, destination, coeff=1):
103 """Update destination from operator(self, other_stats)"""
104 dd = destination.__dict__
105 sd = self.__dict__
106 od = other_stats.__dict__
107 for member, offset in Stats.members_offsets:
108 dd[member] = sd[member] + coeff * od[member]
110 def delta(self, other_stats, destination):
111 """Update destination with self - other_stats"""
112 return self.accumulate(other_stats, destination, coeff=-1)
114 def is_all_zero(self):
115 sd = self.__dict__
116 for name, offset in Stats.members_offsets:
117 if sd[name] != 0:
118 return False
119 return True
121 @staticmethod
122 def build_all_zero():
123 stats = Stats.__new__(Stats)
124 std = stats.__dict__
125 for name, offset in Stats.members_offsets:
126 std[name] = 0
127 return stats
130 # Netlink usage for taskstats
133 TASKSTATS_CMD_GET = 1
134 TASKSTATS_CMD_ATTR_PID = 1
135 TASKSTATS_TYPE_AGGR_PID = 4
136 TASKSTATS_TYPE_PID = 1
137 TASKSTATS_TYPE_STATS = 3
139 class TaskStatsNetlink(object):
140 # Keep in sync with format_stats() and pinfo.did_some_io()
142 def __init__(self, options):
143 self.options = options
144 self.connection = Connection(NETLINK_GENERIC)
145 controller = Controller(self.connection)
146 self.family_id = controller.get_family_id('TASKSTATS')
148 def build_request(self, tid):
149 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
150 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
151 flags=NLM_F_REQUEST)
153 def get_single_task_stats(self, thread):
154 thread.task_stats_request.send(self.connection)
155 try:
156 reply = GeNlMessage.recv(self.connection)
157 except OSError as e:
158 if e.errno == errno.ESRCH:
159 # OSError: Netlink error: No such process (3)
160 return
161 raise
162 for attr_type, attr_value in reply.attrs.items():
163 if attr_type == TASKSTATS_TYPE_AGGR_PID:
164 reply = attr_value.nested()
165 break
166 else:
167 return
168 taskstats_data = reply[TASKSTATS_TYPE_STATS].data
169 if len(taskstats_data) < 272:
170 # Short reply
171 return
172 taskstats_version = struct.unpack('H', taskstats_data[:2])[0]
173 assert taskstats_version >= 4
174 return Stats(taskstats_data)
177 # PIDs manipulations
180 def find_uids(options):
181 """Build options.uids from options.users by resolving usernames to UIDs"""
182 options.uids = []
183 error = False
184 for u in options.users or []:
185 try:
186 uid = int(u)
187 except ValueError:
188 try:
189 passwd = pwd.getpwnam(u)
190 except KeyError:
191 print('Unknown user:', u, file=sys.stderr)
192 error = True
193 else:
194 uid = passwd.pw_uid
195 if not error:
196 options.uids.append(uid)
197 if error:
198 sys.exit(1)
201 def parse_proc_pid_status(pid):
202 result_dict = {}
203 try:
204 for line in open('/proc/%d/status' % pid):
205 key, value = line.split(':\t', 1)
206 result_dict[key] = value.strip()
207 except IOError:
208 pass # No such process
209 return result_dict
212 def safe_utf8_decode(s):
213 try:
214 return s.decode('utf-8')
215 except UnicodeDecodeError:
216 return s.encode('string_escape')
217 except AttributeError:
218 return s
220 class ThreadInfo(DumpableObject):
221 """Stats for a single thread"""
222 def __init__(self, tid, taskstats_connection):
223 self.tid = tid
224 self.mark = True
225 self.stats_total = None
226 self.stats_delta = Stats.__new__(Stats)
227 self.task_stats_request = taskstats_connection.build_request(tid)
229 def get_ioprio(self):
230 return ioprio.get(self.tid)
232 def set_ioprio(self, ioprio_class, ioprio_data):
233 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
234 ioprio_class, ioprio_data)
236 def update_stats(self, stats):
237 if not self.stats_total:
238 self.stats_total = stats
239 stats.delta(self.stats_total, self.stats_delta)
240 self.stats_total = stats
243 class ProcessInfo(DumpableObject):
244 """Stats for a single process (a single line in the output): if
245 options.processes is set, it is a collection of threads, otherwise a single
246 thread."""
247 def __init__(self, pid):
248 self.pid = pid
249 self.uid = None
250 self.user = None
251 self.threads = {} # {tid: ThreadInfo}
252 self.stats_delta = Stats.build_all_zero()
253 self.stats_accum = Stats.build_all_zero()
254 self.stats_accum_timestamp = time.time()
256 def is_monitored(self, options):
257 if (options.pids and not options.processes and
258 self.pid not in options.pids):
259 # We only monitor some threads, not this one
260 return False
262 if options.uids and self.get_uid() not in options.uids:
263 # We only monitor some users, not this one
264 return False
266 return True
268 def get_uid(self):
269 if self.uid:
270 return self.uid
271 # uid in (None, 0) means either we don't know the UID yet or the process
272 # runs as root so it can change its UID. In both cases it means we have
273 # to find out its current UID.
274 try:
275 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
276 except OSError:
277 # The process disappeared
278 uid = None
279 if uid != self.uid:
280 # Maybe the process called setuid()
281 self.user = None
282 self.uid = uid
283 return uid
285 def get_user(self):
286 uid = self.get_uid()
287 if uid is not None and not self.user:
288 try:
289 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
290 except (KeyError, AttributeError):
291 self.user = str(uid)
292 return self.user or '{none}'
294 def get_cmdline(self):
295 # A process may exec, so we must always reread its cmdline
296 try:
297 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
298 cmdline = proc_cmdline.read(4096)
299 except IOError:
300 return '{no such process}'
301 proc_status = parse_proc_pid_status(self.pid)
302 if not cmdline:
303 # Probably a kernel thread, get its name from /proc/PID/status
304 proc_status_name = proc_status.get('Name', '')
305 if proc_status_name:
306 proc_status_name = '[%s]' % proc_status_name
307 else:
308 proc_status_name = '{no name}'
309 return proc_status_name
310 suffix = ''
311 tgid = int(proc_status.get('Tgid', self.pid))
312 if tgid != self.pid:
313 # Not the main thread, maybe it has a custom name
314 tgid_name = parse_proc_pid_status(tgid).get('Name', '')
315 thread_name = proc_status.get('Name', '')
316 if thread_name != tgid_name:
317 suffix += ' [%s]' % thread_name
318 parts = cmdline.split('\0')
319 if parts[0].startswith('/'):
320 first_command_char = parts[0].rfind('/') + 1
321 parts[0] = parts[0][first_command_char:]
322 cmdline = ' '.join(parts).strip()
323 return safe_utf8_decode(cmdline + suffix)
325 def did_some_io(self, accumulated):
326 if accumulated:
327 return not self.stats_accum.is_all_zero()
328 for t in self.threads.values():
329 if not t.stats_delta.is_all_zero():
330 return True
331 return False
333 def get_ioprio(self):
334 priorities = set(t.get_ioprio() for t in self.threads.values())
335 if len(priorities) == 1:
336 return priorities.pop()
337 return '?dif'
339 def set_ioprio(self, ioprio_class, ioprio_data):
340 for thread in self.threads.values():
341 thread.set_ioprio(ioprio_class, ioprio_data)
343 def ioprio_sort_key(self):
344 return ioprio.sort_key(self.get_ioprio())
346 def get_thread(self, tid, taskstats_connection):
347 thread = self.threads.get(tid, None)
348 if not thread:
349 thread = ThreadInfo(tid, taskstats_connection)
350 self.threads[tid] = thread
351 return thread
353 def update_stats(self):
354 stats_delta = Stats.build_all_zero()
355 for tid, thread in self.threads.items():
356 if not thread.mark:
357 stats_delta.accumulate(thread.stats_delta, stats_delta)
358 self.threads = dict([(tid, thread) for tid, thread in
359 self.threads.items() if not thread.mark])
361 nr_threads = len(self.threads)
362 if not nr_threads:
363 return False
365 stats_delta.blkio_delay_total /= nr_threads
366 stats_delta.swapin_delay_total /= nr_threads
368 self.stats_delta = stats_delta
369 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
371 return True
373 class ProcessList(DumpableObject):
374 def __init__(self, taskstats_connection, options):
375 # {pid: ProcessInfo}
376 self.processes = {}
377 self.taskstats_connection = taskstats_connection
378 self.options = options
379 self.timestamp = time.time()
380 self.vmstat = vmstat.VmStat()
382 # A first time as we are interested in the delta
383 self.update_process_counts()
385 def get_process(self, pid):
386 """Either get the specified PID from self.processes or build a new
387 ProcessInfo if we see this PID for the first time"""
388 process = self.processes.get(pid, None)
389 if not process:
390 process = ProcessInfo(pid)
391 self.processes[pid] = process
393 if process.is_monitored(self.options):
394 return process
396 def list_tgids(self):
397 if self.options.pids:
398 return self.options.pids
400 tgids = os.listdir('/proc')
401 if self.options.processes:
402 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
404 tids = []
405 for tgid in tgids:
406 if '0' <= tgid[0] <= '9':
407 try:
408 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
409 except OSError:
410 # The PID went away
411 pass
412 return tids
414 def list_tids(self, tgid):
415 if not self.options.processes:
416 return [tgid]
418 try:
419 tids = list(map(int, os.listdir('/proc/%d/task' % tgid)))
420 except OSError:
421 return []
423 if self.options.pids:
424 tids = list(set(self.options.pids).intersection(set(tids)))
426 return tids
428 def update_process_counts(self):
429 new_timestamp = time.time()
430 self.duration = new_timestamp - self.timestamp
431 self.timestamp = new_timestamp
433 for tgid in self.list_tgids():
434 process = self.get_process(tgid)
435 if not process:
436 continue
437 for tid in self.list_tids(tgid):
438 thread = process.get_thread(tid, self.taskstats_connection)
439 stats = self.taskstats_connection.get_single_task_stats(thread)
440 if stats:
441 thread.update_stats(stats)
442 thread.mark = False
444 return self.vmstat.delta()
446 def refresh_processes(self):
447 for process in self.processes.values():
448 for thread in process.threads.values():
449 thread.mark = True
451 total_read_and_write = self.update_process_counts()
453 self.processes = dict([(pid, process) for pid, process in
454 self.processes.items() if
455 process.update_stats()])
457 return total_read_and_write
459 def clear(self):
460 self.processes = {}