Support for getting and setting IO priority on armel and hppa architectures.
[iotop.git] / iotop / data.py
blob4f2da4b28b0e46480c2e9119f9a380986c5a5bc6
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
19 import errno
20 import glob
21 import os
22 import pprint
23 import pwd
24 import socket
25 import stat
26 import struct
27 import sys
28 import time
31 # Check for requirements:
32 # o Linux >= 2.6.20 with I/O accounting and VM event counters
33 # o Python >= 2.5 or Python 2.4 + ctypes
36 ioaccounting = os.path.exists('/proc/self/io')
38 try:
39 import ctypes
40 except ImportError:
41 has_ctypes = False
42 else:
43 has_ctypes = True
45 try:
46 from iotop.vmstat import VmStat
47 vmstat_f = VmStat()
48 except:
49 vm_event_counters = False
50 else:
51 vm_event_counters = True
53 if not ioaccounting or not has_ctypes or not vm_event_counters:
54 print 'Could not run iotop as some of the requirements are not met:'
55 if not ioaccounting or not vm_event_counters:
56 print '- Linux >= 2.6.20 with'
57 if not ioaccounting:
58 print ' - I/O accounting support ' \
59 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
60 'CONFIG_TASK_IO_ACCOUNTING)'
61 if not vm_event_counters:
62 print ' - VM event counters (CONFIG_VM_EVENT_COUNTERS)'
63 if not has_ctypes:
64 print '- Python >= 2.5 or Python 2.4 with the ctypes module'
66 sys.exit(1)
68 from iotop import ioprio, vmstat
69 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
70 from genetlink import Controller, GeNlMessage
72 class DumpableObject(object):
73 """Base class for all objects that allows easy introspection when printed"""
74 def __repr__(self):
75 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
79 # Interesting fields in a taskstats output
82 class Stats(DumpableObject):
83 members_offsets = [
84 ('blkio_delay_total', 40),
85 ('swapin_delay_total', 56),
86 ('read_bytes', 248),
87 ('write_bytes', 256),
88 ('cancelled_write_bytes', 264)
91 has_blkio_delay_total = False
93 def __init__(self, task_stats_buffer):
94 sd = self.__dict__
95 for name, offset in Stats.members_offsets:
96 data = task_stats_buffer[offset:offset + 8]
97 sd[name] = struct.unpack('Q', data)[0]
99 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
100 # the kernel.
101 if not Stats.has_blkio_delay_total:
102 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
104 def accumulate(self, other_stats, destination, coeff=1):
105 """Update destination from operator(self, other_stats)"""
106 dd = destination.__dict__
107 sd = self.__dict__
108 od = other_stats.__dict__
109 for member, offset in Stats.members_offsets:
110 dd[member] = sd[member] + coeff * od[member]
112 def delta(self, other_stats, destination):
113 """Update destination with self - other_stats"""
114 return self.accumulate(other_stats, destination, coeff=-1)
116 def is_all_zero(self):
117 sd = self.__dict__
118 for name, offset in Stats.members_offsets:
119 if sd[name] != 0:
120 return False
121 return True
123 @staticmethod
124 def build_all_zero():
125 stats = Stats.__new__(Stats)
126 std = stats.__dict__
127 for name, offset in Stats.members_offsets:
128 std[name] = 0
129 return stats
132 # Netlink usage for taskstats
135 TASKSTATS_CMD_GET = 1
136 TASKSTATS_CMD_ATTR_PID = 1
137 TASKSTATS_TYPE_AGGR_PID = 4
138 TASKSTATS_TYPE_PID = 1
140 class TaskStatsNetlink(object):
141 # Keep in sync with format_stats() and pinfo.did_some_io()
143 def __init__(self, options):
144 self.options = options
145 self.connection = Connection(NETLINK_GENERIC)
146 controller = Controller(self.connection)
147 self.family_id = controller.get_family_id('TASKSTATS')
149 def build_request(self, tid):
150 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
151 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
152 flags=NLM_F_REQUEST)
154 def get_single_task_stats(self, thread):
155 thread.task_stats_request.send(self.connection)
156 try:
157 reply = self.connection.recv()
158 except OSError, e:
159 if e.errno == errno.ESRCH:
160 # OSError: Netlink error: No such process (3)
161 return
162 raise
163 if len(reply.payload) < 292:
164 # Short reply
165 return
166 reply_data = reply.payload[20:]
168 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
169 assert reply_length >= 288
170 assert reply_type == TASKSTATS_TYPE_AGGR_PID
172 pid_length, pid_type = struct.unpack('HH', reply.payload[8:12])
173 assert pid_type == TASKSTATS_TYPE_PID
175 taskstats_start = 4 + 4 + pid_length + 4
176 taskstats_data = reply.payload[taskstats_start:]
177 taskstats_version = struct.unpack('H', taskstats_data[:2])[0]
178 assert taskstats_version >= 4
179 return Stats(taskstats_data)
182 # PIDs manipulations
185 def find_uids(options):
186 """Build options.uids from options.users by resolving usernames to UIDs"""
187 options.uids = []
188 error = False
189 for u in options.users or []:
190 try:
191 uid = int(u)
192 except ValueError:
193 try:
194 passwd = pwd.getpwnam(u)
195 except KeyError:
196 print >> sys.stderr, 'Unknown user:', u
197 error = True
198 else:
199 uid = passwd.pw_uid
200 if not error:
201 options.uids.append(uid)
202 if error:
203 sys.exit(1)
205 def safe_utf8_decode(s):
206 try:
207 return s.decode('utf-8')
208 except UnicodeDecodeError:
209 return s.encode('string_escape')
211 class ThreadInfo(DumpableObject):
212 """Stats for a single thread"""
213 def __init__(self, tid, taskstats_connection):
214 self.tid = tid
215 self.mark = True
216 self.stats_total = None
217 self.stats_delta = Stats.__new__(Stats)
218 self.task_stats_request = taskstats_connection.build_request(tid)
220 def get_ioprio(self):
221 return ioprio.get(self.tid)
223 def set_ioprio(self, ioprio_class, ioprio_data):
224 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
225 ioprio_class, ioprio_data)
227 def update_stats(self, stats):
228 if not self.stats_total:
229 self.stats_total = stats
230 stats.delta(self.stats_total, self.stats_delta)
231 self.stats_total = stats
234 class ProcessInfo(DumpableObject):
235 """Stats for a single process (a single line in the output): if
236 options.processes is set, it is a collection of threads, otherwise a single
237 thread."""
238 def __init__(self, pid):
239 self.pid = pid
240 self.uid = None
241 self.user = None
242 self.threads = {} # {tid: ThreadInfo}
243 self.stats_delta = Stats.build_all_zero()
244 self.stats_accum = Stats.build_all_zero()
245 self.stats_accum_timestamp = time.time()
247 def is_monitored(self, options):
248 if (options.pids and not options.processes and
249 self.pid not in options.pids):
250 # We only monitor some threads, not this one
251 return False
253 if options.uids and self.get_uid() not in options.uids:
254 # We only monitor some users, not this one
255 return False
257 return True
259 def get_uid(self):
260 if self.uid:
261 return self.uid
262 # uid in (None, 0) means either we don't know the UID yet or the process
263 # runs as root so it can change its UID. In both cases it means we have
264 # to find out its current UID.
265 try:
266 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
267 except OSError:
268 # The process disappeared
269 uid = None
270 if uid != self.uid:
271 # Maybe the process called setuid()
272 self.user = None
273 self.uid = uid
274 return uid
276 def get_user(self):
277 uid = self.get_uid()
278 if uid is not None and not self.user:
279 try:
280 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
281 except KeyError:
282 self.user = str(uid)
283 return self.user or '{none}'
285 def get_proc_status_name(self):
286 try:
287 first_line = open('/proc/%d/status' % self.pid).readline()
288 except IOError:
289 return '{no such process}'
290 prefix = 'Name:\t'
291 if first_line.startswith(prefix):
292 name = first_line[6:].strip()
293 else:
294 name = ''
295 if name:
296 name = '[%s]' % name
297 else:
298 name = '{no name}'
299 return name
301 def get_cmdline(self):
302 # A process may exec, so we must always reread its cmdline
303 try:
304 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
305 cmdline = proc_cmdline.read(4096)
306 except IOError:
307 return '{no such process}'
308 if not cmdline:
309 # Probably a kernel thread, get its name from /proc/PID/status
310 return self.get_proc_status_name()
311 parts = cmdline.split('\0')
312 if parts[0].startswith('/'):
313 first_command_char = parts[0].rfind('/') + 1
314 parts[0] = parts[0][first_command_char:]
315 cmdline = ' '.join(parts).strip()
316 return safe_utf8_decode(cmdline)
318 def did_some_io(self, accumulated):
319 if accumulated:
320 return not self.stats_accum.is_all_zero()
321 for t in self.threads.itervalues():
322 if not t.stats_delta.is_all_zero():
323 return True
324 return False
326 def get_ioprio(self):
327 priorities = set(t.get_ioprio() for t in self.threads.itervalues())
328 if len(priorities) == 1:
329 return priorities.pop()
330 return '?dif'
332 def set_ioprio(self, ioprio_class, ioprio_data):
333 for thread in self.threads.itervalues():
334 thread.set_ioprio(ioprio_class, ioprio_data)
336 def ioprio_sort_key(self):
337 return ioprio.sort_key(self.get_ioprio())
339 def get_thread(self, tid, taskstats_connection):
340 thread = self.threads.get(tid, None)
341 if not thread:
342 thread = ThreadInfo(tid, taskstats_connection)
343 self.threads[tid] = thread
344 return thread
346 def update_stats(self):
347 stats_delta = Stats.build_all_zero()
348 for tid, thread in self.threads.items():
349 if thread.mark:
350 del self.threads[tid]
351 else:
352 stats_delta.accumulate(thread.stats_delta, stats_delta)
354 nr_threads = len(self.threads)
355 if not nr_threads:
356 return False
358 stats_delta.blkio_delay_total /= nr_threads
359 stats_delta.swapin_delay_total /= nr_threads
361 self.stats_delta = stats_delta
362 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
364 return True
366 class ProcessList(DumpableObject):
367 def __init__(self, taskstats_connection, options):
368 # {pid: ProcessInfo}
369 self.processes = {}
370 self.taskstats_connection = taskstats_connection
371 self.options = options
372 self.timestamp = time.time()
373 self.vmstat = vmstat.VmStat()
375 # A first time as we are interested in the delta
376 self.update_process_counts()
378 def get_process(self, pid):
379 """Either get the specified PID from self.processes or build a new
380 ProcessInfo if we see this PID for the first time"""
381 process = self.processes.get(pid, None)
382 if not process:
383 process = ProcessInfo(pid)
384 self.processes[pid] = process
386 if process.is_monitored(self.options):
387 return process
389 def list_tgids(self):
390 if self.options.pids:
391 return self.options.pids
393 tgids = os.listdir('/proc')
394 if self.options.processes:
395 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
397 tids = []
398 for tgid in tgids:
399 if '0' <= tgid[0] <= '9':
400 try:
401 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
402 except OSError:
403 # The PID went away
404 pass
405 return tids
407 def list_tids(self, tgid):
408 if not self.options.processes:
409 return [tgid]
411 try:
412 tids = map(int, os.listdir('/proc/%d/task' % tgid))
413 except OSError:
414 return []
416 if self.options.pids:
417 tids = list(set(self.options.pids).intersection(set(tids)))
419 return tids
421 def update_process_counts(self):
422 new_timestamp = time.time()
423 self.duration = new_timestamp - self.timestamp
424 self.timestamp = new_timestamp
426 for tgid in self.list_tgids():
427 process = self.get_process(tgid)
428 if not process:
429 continue
430 for tid in self.list_tids(tgid):
431 thread = process.get_thread(tid, self.taskstats_connection)
432 stats = self.taskstats_connection.get_single_task_stats(thread)
433 if stats:
434 thread.update_stats(stats)
435 thread.mark = False
437 return self.vmstat.delta()
439 def refresh_processes(self):
440 for process in self.processes.itervalues():
441 for thread in process.threads.itervalues():
442 thread.mark = True
444 total_read_and_write = self.update_process_counts()
446 for pid, process in self.processes.items():
447 if not process.update_stats():
448 del self.processes[pid]
450 return total_read_and_write
452 def clear(self):
453 self.processes = {}