data.py get_cmdline: use errors=replace for unicode errors
[iotop.git] / iotop / data.py
blob8c1acaee42419c3414e722af1ae66af2a6d3f328
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
19 # Allow printing with same syntax in Python 2/3
20 from __future__ import print_function
22 import errno
23 import os
24 import pprint
25 import pwd
26 import stat
27 import struct
28 import sys
29 import time
31 # Try to ensure time.monotonic() is available
32 # This normally requires Python 3.3 or later.
33 # Use PyPI monotonic if needed and available.
34 # Fall back on non-monotonic time if needed.
35 try:
36 if not hasattr(time, 'monotonic'):
37 from monotonic import monotonic
38 time.monotonic = monotonic
39 except (ImportError, RuntimeError):
40 time.monotonic = time.time
43 # Check for requirements:
44 # o Linux >= 2.6.20 with I/O accounting and VM event counters
47 ioaccounting = os.path.exists('/proc/self/io')
49 try:
50 from iotop.vmstat import VmStat
51 vmstat_f = VmStat()
52 except IOError:
53 vm_event_counters = False
54 else:
55 vm_event_counters = True
57 if not ioaccounting or not vm_event_counters:
58 print('Could not run iotop as some of the requirements are not met:')
59 print('- Linux >= 2.6.20 with')
60 if not ioaccounting:
61 print(' - I/O accounting support ' \
62 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
63 'CONFIG_TASK_IO_ACCOUNTING, kernel.task_delayacct sysctl)')
64 if not vm_event_counters:
65 print(' - VM event counters (CONFIG_VM_EVENT_COUNTERS)')
66 sys.exit(1)
68 from iotop import ioprio, vmstat
69 from iotop.netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
70 from iotop.genetlink import Controller, GeNlMessage
73 class DumpableObject(object):
74 """Base class for objects that allows easy introspection when printed"""
75 def __repr__(self):
76 return '%s: %s>' % (str(type(self))[:-1],
77 pprint.pformat(self.__dict__))
81 # Interesting fields in a taskstats output
84 class Stats(DumpableObject):
85 members_offsets = [
86 ('blkio_delay_total', 40),
87 ('swapin_delay_total', 56),
88 ('read_bytes', 248),
89 ('write_bytes', 256),
90 ('cancelled_write_bytes', 264)
93 has_blkio_delay_total = False
95 def __init__(self, task_stats_buffer):
96 sd = self.__dict__
97 for name, offset in Stats.members_offsets:
98 data = task_stats_buffer[offset:offset + 8]
99 sd[name] = struct.unpack('Q', data)[0]
101 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
102 # the kernel.
103 if not Stats.has_blkio_delay_total:
104 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
106 def accumulate(self, other_stats, destination, coeff=1):
107 """Update destination from operator(self, other_stats)"""
108 dd = destination.__dict__
109 sd = self.__dict__
110 od = other_stats.__dict__
111 for member, offset in Stats.members_offsets:
112 dd[member] = sd[member] + coeff * od[member]
114 def delta(self, other_stats, destination):
115 """Update destination with self - other_stats"""
116 return self.accumulate(other_stats, destination, coeff=-1)
118 def is_all_zero(self):
119 sd = self.__dict__
120 for name, offset in Stats.members_offsets:
121 if sd[name] != 0:
122 return False
123 return True
125 @staticmethod
126 def build_all_zero():
127 stats = Stats.__new__(Stats)
128 std = stats.__dict__
129 for name, offset in Stats.members_offsets:
130 std[name] = 0
131 return stats
134 # Netlink usage for taskstats
137 TASKSTATS_CMD_GET = 1
138 TASKSTATS_CMD_ATTR_PID = 1
139 TASKSTATS_TYPE_AGGR_PID = 4
140 TASKSTATS_TYPE_PID = 1
141 TASKSTATS_TYPE_STATS = 3
144 class TaskStatsNetlink(object):
145 # Keep in sync with format_stats() and pinfo.did_some_io()
147 def __init__(self, options):
148 self.options = options
149 self.connection = Connection(NETLINK_GENERIC)
150 controller = Controller(self.connection)
151 self.family_id = controller.get_family_id('TASKSTATS')
153 def build_request(self, tid):
154 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
155 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
156 flags=NLM_F_REQUEST)
158 def get_single_task_stats(self, thread):
159 thread.task_stats_request.send(self.connection)
160 try:
161 reply = GeNlMessage.recv(self.connection)
162 except OSError as e:
163 if e.errno == errno.ESRCH:
164 # OSError: Netlink error: No such process (3)
165 return
166 raise
167 for attr_type, attr_value in reply.attrs.items():
168 if attr_type == TASKSTATS_TYPE_AGGR_PID:
169 reply = attr_value.nested()
170 break
171 else:
172 return
173 taskstats_data = reply[TASKSTATS_TYPE_STATS].data
174 if len(taskstats_data) < 272:
175 # Short reply
176 return
177 taskstats_version = struct.unpack('H', taskstats_data[:2])[0]
178 assert taskstats_version >= 4
179 return Stats(taskstats_data)
182 # PIDs manipulations
186 def find_uids(options):
187 """Build options.uids from options.users by resolving usernames to UIDs"""
188 options.uids = []
189 error = False
190 for u in options.users or []:
191 try:
192 uid = int(u)
193 except ValueError:
194 try:
195 passwd = pwd.getpwnam(u)
196 except KeyError:
197 print('Unknown user:', u, file=sys.stderr)
198 error = True
199 else:
200 uid = passwd.pw_uid
201 if not error:
202 options.uids.append(uid)
203 if error:
204 sys.exit(1)
207 def parse_proc_pid_status(pid):
208 result_dict = {}
209 try:
210 for line in open('/proc/%d/status' % pid, errors='replace'):
211 try:
212 key, value = line.split(':', 1)
213 except ValueError:
214 # Ignore lines that are not formatted correctly as
215 # some downstream kernels may have weird lines and
216 # the needed fields are probably formatted correctly.
217 continue
218 result_dict[key] = value.strip()
219 except IOError:
220 pass # No such process
221 return result_dict
224 def safe_utf8_decode(s):
225 try:
226 return s.decode('utf-8')
227 except UnicodeDecodeError:
228 return s.encode('string_escape')
229 except AttributeError:
230 return s
233 class ThreadInfo(DumpableObject):
234 """Stats for a single thread"""
235 def __init__(self, tid, taskstats_connection):
236 self.tid = tid
237 self.mark = True
238 self.stats_total = None
239 self.stats_delta = Stats.__new__(Stats)
240 self.task_stats_request = taskstats_connection.build_request(tid)
242 def get_ioprio(self):
243 return ioprio.get(self.tid)
245 def set_ioprio(self, ioprio_class, ioprio_data):
246 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
247 ioprio_class, ioprio_data)
249 def update_stats(self, stats):
250 if not self.stats_total:
251 self.stats_total = stats
252 stats.delta(self.stats_total, self.stats_delta)
253 self.stats_total = stats
256 class ProcessInfo(DumpableObject):
257 """Stats for a single process (a single line in the output): if
258 options.processes is set, it is a collection of threads, otherwise a single
259 thread."""
260 def __init__(self, pid):
261 self.pid = pid
262 self.uid = None
263 self.user = None
264 self.threads = {} # {tid: ThreadInfo}
265 self.stats_delta = Stats.build_all_zero()
266 self.stats_accum = Stats.build_all_zero()
267 self.stats_accum_timestamp = time.monotonic()
269 def is_monitored(self, options):
270 if (options.pids and not options.processes and
271 self.pid not in options.pids):
272 # We only monitor some threads, not this one
273 return False
275 if options.uids and self.get_uid() not in options.uids:
276 # We only monitor some users, not this one
277 return False
279 return True
281 def get_uid(self):
282 if self.uid:
283 return self.uid
284 # uid in (None, 0) means either we don't know the UID yet or the
285 # process runs as root so it can change its UID. In both cases it means
286 # we have to find out its current UID.
287 try:
288 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
289 except OSError:
290 # The process disappeared
291 uid = None
292 if uid != self.uid:
293 # Maybe the process called setuid()
294 self.user = None
295 self.uid = uid
296 return uid
298 def get_user(self):
299 uid = self.get_uid()
300 if uid is not None and not self.user:
301 try:
302 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
303 except (KeyError, AttributeError):
304 self.user = str(uid)
305 return self.user or '{none}'
307 def get_cmdline(self):
308 # A process may exec, so we must always reread its cmdline
309 try:
310 proc_cmdline = open('/proc/%d/cmdline' % self.pid, errors='replace')
311 cmdline = proc_cmdline.read(4096)
312 except IOError:
313 return '{no such process}'
314 proc_status = parse_proc_pid_status(self.pid)
315 if not cmdline:
316 # Probably a kernel thread, get its name from /proc/PID/status
317 proc_status_name = proc_status.get('Name', '')
318 if proc_status_name:
319 proc_status_name = '[%s]' % proc_status_name
320 else:
321 proc_status_name = '{no name}'
322 return proc_status_name
323 suffix = ''
324 tgid = int(proc_status.get('Tgid', self.pid))
325 if tgid != self.pid:
326 # Not the main thread, maybe it has a custom name
327 tgid_name = parse_proc_pid_status(tgid).get('Name', '')
328 thread_name = proc_status.get('Name', '')
329 if thread_name != tgid_name:
330 suffix += ' [%s]' % thread_name
331 parts = cmdline.split('\0')
332 if parts[0].startswith('/'):
333 first_command_char = parts[0].rfind('/') + 1
334 parts[0] = parts[0][first_command_char:]
335 cmdline = ' '.join(parts).strip()
336 return safe_utf8_decode(cmdline + suffix)
338 def did_some_io(self, accumulated):
339 if accumulated:
340 return not self.stats_accum.is_all_zero()
341 for t in self.threads.values():
342 if not t.stats_delta.is_all_zero():
343 return True
344 return False
346 def get_ioprio(self):
347 priorities = set(t.get_ioprio() for t in self.threads.values())
348 if len(priorities) == 1:
349 return priorities.pop()
350 return '?dif'
352 def set_ioprio(self, ioprio_class, ioprio_data):
353 for thread in self.threads.values():
354 thread.set_ioprio(ioprio_class, ioprio_data)
356 def ioprio_sort_key(self):
357 return ioprio.sort_key(self.get_ioprio())
359 def get_thread(self, tid, taskstats_connection):
360 thread = self.threads.get(tid, None)
361 if not thread:
362 thread = ThreadInfo(tid, taskstats_connection)
363 self.threads[tid] = thread
364 return thread
366 def update_stats(self):
367 stats_delta = Stats.build_all_zero()
368 for tid, thread in self.threads.items():
369 if not thread.mark:
370 stats_delta.accumulate(thread.stats_delta, stats_delta)
371 self.threads = dict([(tid, thread) for tid, thread in
372 self.threads.items() if not thread.mark])
374 nr_threads = len(self.threads)
375 if not nr_threads:
376 return False
378 stats_delta.blkio_delay_total /= nr_threads
379 stats_delta.swapin_delay_total /= nr_threads
381 self.stats_delta = stats_delta
382 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
384 return True
387 class ProcessList(DumpableObject):
388 def __init__(self, taskstats_connection, options):
389 # {pid: ProcessInfo}
390 self.processes = {}
391 self.taskstats_connection = taskstats_connection
392 self.options = options
393 self.timestamp = time.monotonic()
394 self.vmstat = vmstat.VmStat()
396 # A first time as we are interested in the delta
397 self.update_process_counts()
399 def get_process(self, pid):
400 """Either get the specified PID from self.processes or build a new
401 ProcessInfo if we see this PID for the first time"""
402 process = self.processes.get(pid, None)
403 if not process:
404 process = ProcessInfo(pid)
405 self.processes[pid] = process
407 if process.is_monitored(self.options):
408 return process
410 def list_tgids(self):
411 if self.options.pids:
412 return self.options.pids
414 tgids = os.listdir('/proc')
415 if self.options.processes:
416 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
418 tids = []
419 for tgid in tgids:
420 if '0' <= tgid[0] <= '9':
421 try:
422 tids.extend(map(int,
423 os.listdir('/proc/' + tgid + '/task')))
424 except OSError:
425 # The PID went away
426 pass
427 return tids
429 def list_tids(self, tgid):
430 if not self.options.processes:
431 return [tgid]
433 try:
434 tids = list(map(int, os.listdir('/proc/%d/task' % tgid)))
435 except OSError:
436 return []
438 if self.options.pids:
439 tids = list(set(self.options.pids).intersection(set(tids)))
441 return tids
443 def update_process_counts(self):
444 new_timestamp = time.monotonic()
445 self.duration = new_timestamp - self.timestamp
446 self.timestamp = new_timestamp
448 total_read = total_write = 0
450 for tgid in self.list_tgids():
451 process = self.get_process(tgid)
452 if not process:
453 continue
454 for tid in self.list_tids(tgid):
455 thread = process.get_thread(tid, self.taskstats_connection)
456 stats = self.taskstats_connection.get_single_task_stats(thread)
457 if stats:
458 thread.update_stats(stats)
459 delta = thread.stats_delta
460 total_read += delta.read_bytes
461 total_write += delta.write_bytes
462 thread.mark = False
463 return (total_read, total_write), self.vmstat.delta()
465 def refresh_processes(self):
466 for process in self.processes.values():
467 for thread in process.threads.values():
468 thread.mark = True
470 total_read_and_write = self.update_process_counts()
472 self.processes = dict([(pid, process) for pid, process in
473 self.processes.items() if
474 process.update_stats()])
476 return total_read_and_write
478 def clear(self):
479 self.processes = {}
482 def sysctl_task_delayacct():
483 try:
484 with open('/proc/sys/kernel/task_delayacct') as f:
485 return bool(int(f.read().strip()))
486 except FileNotFoundError:
487 return None