1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2007 Guillaume Chazarain <guichaz@gmail.com>
31 # Check for requirements:
32 # o Linux >= 2.6.20 with I/O accounting and VM event counters
33 # o Python >= 2.5 or Python 2.4 + ctypes
36 ioaccounting
= os
.path
.exists('/proc/self/io')
46 from iotop
.vmstat
import VmStat
49 vm_event_counters
= False
51 vm_event_counters
= True
53 if not ioaccounting
or not has_ctypes
or not vm_event_counters
:
54 print 'Could not run iotop as some of the requirements are not met:'
55 if not ioaccounting
or not vm_event_counters
:
56 print '- Linux >= 2.6.20 with'
58 print ' - I/O accounting support ' \
59 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
60 'CONFIG_TASK_IO_ACCOUNTING)'
61 if not vm_event_counters
:
62 print ' - VM event counters (CONFIG_VM_EVENT_COUNTERS)'
64 print '- Python >= 2.5 or Python 2.4 with the ctypes module'
68 from iotop
import ioprio
, vmstat
69 from netlink
import Connection
, NETLINK_GENERIC
, U32Attr
, NLM_F_REQUEST
70 from genetlink
import Controller
, GeNlMessage
72 class DumpableObject(object):
73 """Base class for all objects that allows easy introspection when printed"""
75 return '%s: %s>' % (str(type(self
))[:-1], pprint
.pformat(self
.__dict
__))
79 # Interesting fields in a taskstats output
82 class Stats(DumpableObject
):
84 ('blkio_delay_total', 40),
85 ('swapin_delay_total', 56),
88 ('cancelled_write_bytes', 264)
91 has_blkio_delay_total
= False
93 def __init__(self
, task_stats_buffer
):
95 for name
, offset
in Stats
.members_offsets
:
96 data
= task_stats_buffer
[offset
:offset
+ 8]
97 sd
[name
] = struct
.unpack('Q', data
)[0]
99 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
101 if not Stats
.has_blkio_delay_total
:
102 Stats
.has_blkio_delay_total
= self
.blkio_delay_total
!= 0
104 def accumulate(self
, other_stats
, destination
, coeff
=1):
105 """Update destination from operator(self, other_stats)"""
106 dd
= destination
.__dict
__
108 od
= other_stats
.__dict
__
109 for member
, offset
in Stats
.members_offsets
:
110 dd
[member
] = sd
[member
] + coeff
* od
[member
]
112 def delta(self
, other_stats
, destination
):
113 """Update destination with self - other_stats"""
114 return self
.accumulate(other_stats
, destination
, coeff
=-1)
116 def is_all_zero(self
):
118 for name
, offset
in Stats
.members_offsets
:
124 def build_all_zero():
125 stats
= Stats
.__new
__(Stats
)
127 for name
, offset
in Stats
.members_offsets
:
132 # Netlink usage for taskstats
135 TASKSTATS_CMD_GET
= 1
136 TASKSTATS_CMD_ATTR_PID
= 1
137 TASKSTATS_TYPE_AGGR_PID
= 4
138 TASKSTATS_TYPE_PID
= 1
140 class TaskStatsNetlink(object):
141 # Keep in sync with format_stats() and pinfo.did_some_io()
143 def __init__(self
, options
):
144 self
.options
= options
145 self
.connection
= Connection(NETLINK_GENERIC
)
146 controller
= Controller(self
.connection
)
147 self
.family_id
= controller
.get_family_id('TASKSTATS')
149 def build_request(self
, tid
):
150 return GeNlMessage(self
.family_id
, cmd
=TASKSTATS_CMD_GET
,
151 attrs
=[U32Attr(TASKSTATS_CMD_ATTR_PID
, tid
)],
154 def get_single_task_stats(self
, thread
):
155 thread
.task_stats_request
.send(self
.connection
)
157 reply
= self
.connection
.recv()
159 if e
.errno
== errno
.ESRCH
:
160 # OSError: Netlink error: No such process (3)
163 if len(reply
.payload
) < 292:
166 reply_data
= reply
.payload
[20:]
168 reply_length
, reply_type
= struct
.unpack('HH', reply
.payload
[4:8])
169 assert reply_length
>= 288
170 assert reply_type
== TASKSTATS_TYPE_AGGR_PID
172 pid_length
, pid_type
= struct
.unpack('HH', reply
.payload
[8:12])
173 assert pid_type
== TASKSTATS_TYPE_PID
175 taskstats_start
= 4 + 4 + pid_length
+ 4
176 taskstats_data
= reply
.payload
[taskstats_start
:]
177 taskstats_version
= struct
.unpack('H', taskstats_data
[:2])[0]
178 assert taskstats_version
>= 4
179 return Stats(taskstats_data
)
185 def find_uids(options
):
186 """Build options.uids from options.users by resolving usernames to UIDs"""
189 for u
in options
.users
or []:
194 passwd
= pwd
.getpwnam(u
)
196 print >> sys
.stderr
, 'Unknown user:', u
201 options
.uids
.append(uid
)
205 def safe_utf8_decode(s
):
207 return s
.decode('utf-8')
208 except UnicodeDecodeError:
209 return s
.encode('string_escape')
211 class ThreadInfo(DumpableObject
):
212 """Stats for a single thread"""
213 def __init__(self
, tid
, taskstats_connection
):
216 self
.stats_total
= None
217 self
.stats_delta
= Stats
.__new
__(Stats
)
218 self
.task_stats_request
= taskstats_connection
.build_request(tid
)
220 def get_ioprio(self
):
221 return ioprio
.get(self
.tid
)
223 def set_ioprio(self
, ioprio_class
, ioprio_data
):
224 return ioprio
.set_ioprio(ioprio
.IOPRIO_WHO_PROCESS
, self
.tid
,
225 ioprio_class
, ioprio_data
)
227 def update_stats(self
, stats
):
228 if not self
.stats_total
:
229 self
.stats_total
= stats
230 stats
.delta(self
.stats_total
, self
.stats_delta
)
231 self
.stats_total
= stats
234 class ProcessInfo(DumpableObject
):
235 """Stats for a single process (a single line in the output): if
236 options.processes is set, it is a collection of threads, otherwise a single
238 def __init__(self
, pid
):
242 self
.threads
= {} # {tid: ThreadInfo}
243 self
.stats_delta
= Stats
.build_all_zero()
244 self
.stats_accum
= Stats
.build_all_zero()
245 self
.stats_accum_timestamp
= time
.time()
247 def is_monitored(self
, options
):
248 if (options
.pids
and not options
.processes
and
249 self
.pid
not in options
.pids
):
250 # We only monitor some threads, not this one
253 if options
.uids
and self
.get_uid() not in options
.uids
:
254 # We only monitor some users, not this one
262 # uid in (None, 0) means either we don't know the UID yet or the process
263 # runs as root so it can change its UID. In both cases it means we have
264 # to find out its current UID.
266 uid
= os
.stat('/proc/%d' % self
.pid
)[stat
.ST_UID
]
268 # The process disappeared
271 # Maybe the process called setuid()
278 if uid
is not None and not self
.user
:
280 self
.user
= safe_utf8_decode(pwd
.getpwuid(uid
).pw_name
)
283 return self
.user
or '{none}'
285 def get_proc_status_name(self
):
287 first_line
= open('/proc/%d/status' % self
.pid
).readline()
289 return '{no such process}'
291 if first_line
.startswith(prefix
):
292 name
= first_line
[6:].strip()
301 def get_cmdline(self
):
302 # A process may exec, so we must always reread its cmdline
304 proc_cmdline
= open('/proc/%d/cmdline' % self
.pid
)
305 cmdline
= proc_cmdline
.read(4096)
307 return '{no such process}'
309 # Probably a kernel thread, get its name from /proc/PID/status
310 return self
.get_proc_status_name()
311 parts
= cmdline
.split('\0')
312 if parts
[0].startswith('/'):
313 first_command_char
= parts
[0].rfind('/') + 1
314 parts
[0] = parts
[0][first_command_char
:]
315 cmdline
= ' '.join(parts
).strip()
316 return safe_utf8_decode(cmdline
)
318 def did_some_io(self
, accumulated
):
320 return not self
.stats_accum
.is_all_zero()
321 for t
in self
.threads
.itervalues():
322 if not t
.stats_delta
.is_all_zero():
326 def get_ioprio(self
):
327 priorities
= set(t
.get_ioprio() for t
in self
.threads
.itervalues())
328 if len(priorities
) == 1:
329 return priorities
.pop()
332 def set_ioprio(self
, ioprio_class
, ioprio_data
):
333 for thread
in self
.threads
.itervalues():
334 thread
.set_ioprio(ioprio_class
, ioprio_data
)
336 def ioprio_sort_key(self
):
337 return ioprio
.sort_key(self
.get_ioprio())
339 def get_thread(self
, tid
, taskstats_connection
):
340 thread
= self
.threads
.get(tid
, None)
342 thread
= ThreadInfo(tid
, taskstats_connection
)
343 self
.threads
[tid
] = thread
346 def update_stats(self
):
347 stats_delta
= Stats
.build_all_zero()
348 for tid
, thread
in self
.threads
.items():
350 del self
.threads
[tid
]
352 stats_delta
.accumulate(thread
.stats_delta
, stats_delta
)
354 nr_threads
= len(self
.threads
)
358 stats_delta
.blkio_delay_total
/= nr_threads
359 stats_delta
.swapin_delay_total
/= nr_threads
361 self
.stats_delta
= stats_delta
362 self
.stats_accum
.accumulate(self
.stats_delta
, self
.stats_accum
)
366 class ProcessList(DumpableObject
):
367 def __init__(self
, taskstats_connection
, options
):
370 self
.taskstats_connection
= taskstats_connection
371 self
.options
= options
372 self
.timestamp
= time
.time()
373 self
.vmstat
= vmstat
.VmStat()
375 # A first time as we are interested in the delta
376 self
.update_process_counts()
378 def get_process(self
, pid
):
379 """Either get the specified PID from self.processes or build a new
380 ProcessInfo if we see this PID for the first time"""
381 process
= self
.processes
.get(pid
, None)
383 process
= ProcessInfo(pid
)
384 self
.processes
[pid
] = process
386 if process
.is_monitored(self
.options
):
389 def list_tgids(self
):
390 if self
.options
.pids
:
391 return self
.options
.pids
393 tgids
= os
.listdir('/proc')
394 if self
.options
.processes
:
395 return [int(tgid
) for tgid
in tgids
if '0' <= tgid
[0] <= '9']
399 if '0' <= tgid
[0] <= '9':
401 tids
.extend(map(int, os
.listdir('/proc/' + tgid
+ '/task')))
407 def list_tids(self
, tgid
):
408 if not self
.options
.processes
:
412 tids
= map(int, os
.listdir('/proc/%d/task' % tgid
))
416 if self
.options
.pids
:
417 tids
= list(set(self
.options
.pids
).intersection(set(tids
)))
421 def update_process_counts(self
):
422 new_timestamp
= time
.time()
423 self
.duration
= new_timestamp
- self
.timestamp
424 self
.timestamp
= new_timestamp
426 for tgid
in self
.list_tgids():
427 process
= self
.get_process(tgid
)
430 for tid
in self
.list_tids(tgid
):
431 thread
= process
.get_thread(tid
, self
.taskstats_connection
)
432 stats
= self
.taskstats_connection
.get_single_task_stats(thread
)
434 thread
.update_stats(stats
)
437 return self
.vmstat
.delta()
439 def refresh_processes(self
):
440 for process
in self
.processes
.itervalues():
441 for thread
in process
.threads
.itervalues():
444 total_read_and_write
= self
.update_process_counts()
446 for pid
, process
in self
.processes
.items():
447 if not process
.update_stats():
448 del self
.processes
[pid
]
450 return total_read_and_write