12 from iotop
import ioprio
, vmstat
13 from netlink
import Connection
, NETLINK_GENERIC
, U32Attr
, NLM_F_REQUEST
14 from genetlink
import Controller
, GeNlMessage
17 # Check for requirements:
18 # o Python >= 2.5 for AF_NETLINK sockets
19 # o Linux >= 2.6.20 with I/O accounting
24 except AttributeError:
27 ioaccounting
= os
.path
.exists('/proc/self/io')
29 if not python25
or not ioaccounting
:
30 def boolean2string(boolean
):
31 return boolean
and 'Found' or 'Not found'
32 print 'Could not run iotop as some of the requirements are not met:'
33 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25
)
34 print '- Linux >= 2.6.20 with I/O accounting support ' \
35 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
36 'CONFIG_TASK_IO_ACCOUNTING):', \
37 boolean2string(ioaccounting
)
40 class DumpableObject(object):
41 """Base class for all objects that allows easy introspection when printed"""
43 return '%s: %s>' % (str(type(self
))[:-1], pprint
.pformat(self
.__dict
__))
47 # Interesting fields in a taskstats output
50 class Stats(DumpableObject
):
52 ('blkio_delay_total', 40),
53 ('swapin_delay_total', 56),
56 ('cancelled_write_bytes', 264)
59 def __init__(self
, task_stats_buffer
):
60 for name
, offset
in Stats
.members_offsets
:
61 data
= task_stats_buffer
[offset
:offset
+ 8]
62 setattr(self
, name
, struct
.unpack('Q', data
)[0])
64 def accumulate(self
, other_stats
, operator
=sum):
65 """Returns a new Stats object built from operator(self, other_stats)"""
66 delta_stats
= Stats
.__new
__(Stats
)
67 for name
, offset
in Stats
.members_offsets
:
68 self_value
= getattr(self
, name
)
69 other_value
= getattr(other_stats
, name
)
70 setattr(delta_stats
, name
, operator((self_value
, other_value
)))
73 def delta(self
, other_stats
):
74 """Returns self - other_stats"""
75 def subtract((me
, other
)):
77 return self
.accumulate(other_stats
, operator
=subtract
)
79 def is_all_zero(self
):
80 for name
, offset
in Stats
.members_offsets
:
81 if getattr(self
, name
) != 0:
87 stats
= Stats
.__new
__(Stats
)
88 for name
, offset
in Stats
.members_offsets
:
89 setattr(stats
, name
, 0)
93 # Netlink usage for taskstats
97 TASKSTATS_CMD_ATTR_PID
= 1
99 class TaskStatsNetlink(object):
100 # Keep in sync with human_stats(stats, duration) and pinfo.did_some_io()
102 def __init__(self
, options
):
103 self
.options
= options
104 self
.connection
= Connection(NETLINK_GENERIC
)
105 controller
= Controller(self
.connection
)
106 self
.family_id
= controller
.get_family_id('TASKSTATS')
108 def get_single_task_stats(self
, pid
):
109 request
= GeNlMessage(self
.family_id
, cmd
=TASKSTATS_CMD_GET
,
110 attrs
=[U32Attr(TASKSTATS_CMD_ATTR_PID
, pid
)],
112 request
.send(self
.connection
)
114 reply
= self
.connection
.recv()
116 if e
.errno
== errno
.ESRCH
:
117 # OSError: Netlink error: No such process (3)
120 if len(reply
.payload
) < 292:
123 reply_data
= reply
.payload
[20:]
125 reply_length
, reply_type
= struct
.unpack('HH', reply
.payload
[4:8])
126 reply_version
= struct
.unpack('H', reply
.payload
[20:22])[0]
127 assert reply_length
>= 288
128 assert reply_type
== TASKSTATS_CMD_ATTR_PID
+ 3
129 assert reply_version
>= 4
130 return Stats(reply_data
)
136 def find_uids(options
):
137 """Build options.uids from options.users by resolving usernames to UIDs"""
140 for u
in options
.users
or []:
145 passwd
= pwd
.getpwnam(u
)
147 print >> sys
.stderr
, 'Unknown user:', u
152 options
.uids
.append(uid
)
156 def safe_utf8_decode(s
):
158 return s
.decode('utf-8')
159 except UnicodeDecodeError:
160 return s
.encode('string_escape')
162 class ThreadInfo(DumpableObject
):
163 """Stats for a single thread"""
164 def __init__(self
, tid
):
167 self
.stats_total
= None
168 self
.stats_delta
= None
170 def get_ioprio(self
):
171 return ioprio
.get(self
.tid
)
173 def update_stats(self
, stats
):
174 if not self
.stats_total
:
175 self
.stats_total
= stats
176 self
.stats_delta
= stats
.delta(self
.stats_total
)
177 self
.stats_total
= stats
180 class ProcessInfo(DumpableObject
):
181 """Stats for a single process (a single line in the output): if
182 options.processes is set, it is a collection of threads, otherwise a single
184 def __init__(self
, pid
):
188 self
.threads
= {} # {tid: ThreadInfo}
189 self
.stats_delta
= Stats
.build_all_zero()
190 self
.stats_accum
= Stats
.build_all_zero()
191 self
.stats_accum_timestamp
= time
.time()
193 def is_monitored(self
, options
):
194 if (options
.pids
and not options
.processes
and
195 self
.pid
not in options
.pids
):
196 # We only monitor some threads, not this one
199 if options
.uids
and self
.get_uid() not in options
.uids
:
200 # We only monitor some users, not this one
208 # uid in (None, 0) means either we don't know the UID yet or the process
209 # runs as root so it can change its UID. In both cases it means we have
210 # to find out its current UID.
212 uid
= os
.stat('/proc/%d' % self
.pid
)[stat
.ST_UID
]
214 # The process disappeared
217 # Maybe the process called setuid()
223 if uid
is not None and not self
.user
:
225 self
.user
= safe_utf8_decode(pwd
.getpwuid(uid
).pw_name
)
228 return self
.user
or '{none}'
230 def get_proc_status_name(self
):
232 proc_status
= open('/proc/%d/status' % self
.pid
)
234 return '{no such process}'
235 first_line
= proc_status
.readline()
237 if first_line
.startswith(prefix
):
238 name
= first_line
[6:].strip()
247 def get_cmdline(self
):
248 # A process may exec, so we must always reread its cmdline
250 proc_cmdline
= open('/proc/%d/cmdline' % self
.pid
)
251 cmdline
= proc_cmdline
.read(4096)
253 return '{no such process}'
255 # Probably a kernel thread, get its name from /proc/PID/status
256 return self
.get_proc_status_name()
257 parts
= cmdline
.split('\0')
258 if parts
[0].startswith('/'):
259 first_command_char
= parts
[0].rfind('/') + 1
260 parts
[0] = parts
[0][first_command_char
:]
261 cmdline
= ' '.join(parts
).strip()
262 return safe_utf8_decode(cmdline
)
264 def did_some_io(self
):
265 return not all(t
.stats_delta
.is_all_zero() for
266 t
in self
.threads
.itervalues())
268 def get_ioprio(self
):
269 priorities
= set(t
.get_ioprio() for t
in self
.threads
.itervalues())
270 if len(priorities
) == 1:
271 return priorities
.pop()
274 def ioprio_sort_key(self
):
275 return ioprio
.sort_key(self
.get_ioprio())
277 def get_thread(self
, tid
):
278 thread
= self
.threads
.get(tid
, None)
280 thread
= ThreadInfo(tid
)
281 self
.threads
[tid
] = thread
284 def update_stats(self
):
285 stats_delta
= Stats
.build_all_zero()
286 for tid
, thread
in self
.threads
.items():
288 del self
.threads
[tid
]
290 stats_delta
= stats_delta
.accumulate(thread
.stats_delta
)
292 nr_threads
= len(self
.threads
)
296 stats_delta
.blkio_delay_total
/= nr_threads
297 stats_delta
.swapin_delay_total
/= nr_threads
299 self
.stats_delta
= stats_delta
300 self
.stats_accum
= self
.stats_accum
.accumulate(self
.stats_delta
)
304 class ProcessList(DumpableObject
):
305 def __init__(self
, taskstats_connection
, options
):
308 self
.taskstats_connection
= taskstats_connection
309 self
.options
= options
310 self
.timestamp
= time
.time()
311 self
.vmstat
= vmstat
.VmStat()
313 # A first time as we are interested in the delta
314 self
.update_process_counts()
316 def get_process(self
, pid
):
317 """Either get the specified PID from self.processes or build a new
318 ProcessInfo if we see this PID for the first time"""
319 process
= self
.processes
.get(pid
, None)
321 process
= ProcessInfo(pid
)
322 self
.processes
[pid
] = process
324 if process
.is_monitored(self
.options
):
327 def list_tgids(self
):
328 if self
.options
.pids
:
329 for pid
in self
.options
.pids
:
332 pattern
= '/proc/[0-9]*'
333 if not self
.options
.processes
:
336 for path
in glob
.iglob(pattern
):
337 yield int(os
.path
.basename(path
))
339 def list_tids(self
, tgid
):
340 if not self
.options
.processes
:
344 tids
= map(int, os
.listdir('/proc/%d/task' % tgid
))
348 if self
.options
.pids
:
349 tids
= list(set(self
.options
.pids
).intersection(set(tids
)))
353 def update_process_counts(self
):
354 new_timestamp
= time
.time()
355 self
.duration
= new_timestamp
- self
.timestamp
356 self
.timestamp
= new_timestamp
358 for tgid
in self
.list_tgids():
359 process
= self
.get_process(tgid
)
362 for tid
in self
.list_tids(tgid
):
363 thread
= process
.get_thread(tid
)
364 stats
= self
.taskstats_connection
.get_single_task_stats(tid
)
366 thread
.update_stats(stats
)
369 return self
.vmstat
.delta()
371 def refresh_processes(self
):
372 for process
in self
.processes
.itervalues():
373 for thread
in process
.threads
.itervalues():
376 total_read_and_write
= self
.update_process_counts()
378 for pid
, process
in self
.processes
.items():
379 if not process
.update_stats():
380 del self
.processes
[pid
]
382 return total_read_and_write