13 # Check for requirements:
14 # o Python >= 2.5 for AF_NETLINK sockets
15 # o Linux >= 2.6.20 with I/O accounting
20 except AttributeError:
23 ioaccounting
= os
.path
.exists('/proc/self/io')
25 if not python25
or not ioaccounting
:
26 def boolean2string(boolean
):
27 return boolean
and 'Found' or 'Not found'
28 print 'Could not run iotop as some of the requirements are not met:'
29 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25
)
30 print '- Linux >= 2.6.20 with I/O accounting support ' \
31 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
32 'CONFIG_TASK_IO_ACCOUNTING):', \
33 boolean2string(ioaccounting
)
36 from iotop
import ioprio
, vmstat
37 from netlink
import Connection
, NETLINK_GENERIC
, U32Attr
, NLM_F_REQUEST
38 from genetlink
import Controller
, GeNlMessage
40 class DumpableObject(object):
41 """Base class for all objects that allows easy introspection when printed"""
43 return '%s: %s>' % (str(type(self
))[:-1], pprint
.pformat(self
.__dict
__))
47 # Interesting fields in a taskstats output
50 class Stats(DumpableObject
):
52 ('blkio_delay_total', 40),
53 ('swapin_delay_total', 56),
56 ('cancelled_write_bytes', 264)
59 has_blkio_delay_total
= False
61 def __init__(self
, task_stats_buffer
):
63 for name
, offset
in Stats
.members_offsets
:
64 data
= task_stats_buffer
[offset
:offset
+ 8]
65 sd
[name
] = struct
.unpack('Q', data
)[0]
67 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
69 if not Stats
.has_blkio_delay_total
:
70 Stats
.has_blkio_delay_total
= self
.blkio_delay_total
!= 0
72 def accumulate(self
, other_stats
, destination
, coeff
=1):
73 """Update destination from operator(self, other_stats)"""
74 dd
= destination
.__dict
__
76 od
= other_stats
.__dict
__
77 for member
, offset
in Stats
.members_offsets
:
78 dd
[member
] = sd
[member
] + coeff
* od
[member
]
80 def delta(self
, other_stats
, destination
):
81 """Update destination with self - other_stats"""
82 return self
.accumulate(other_stats
, destination
, coeff
=-1)
84 def is_all_zero(self
):
86 for name
, offset
in Stats
.members_offsets
:
93 stats
= Stats
.__new
__(Stats
)
95 for name
, offset
in Stats
.members_offsets
:
100 # Netlink usage for taskstats
103 TASKSTATS_CMD_GET
= 1
104 TASKSTATS_CMD_ATTR_PID
= 1
106 class TaskStatsNetlink(object):
107 # Keep in sync with format_stats() and pinfo.did_some_io()
109 def __init__(self
, options
):
110 self
.options
= options
111 self
.connection
= Connection(NETLINK_GENERIC
)
112 controller
= Controller(self
.connection
)
113 self
.family_id
= controller
.get_family_id('TASKSTATS')
115 def build_request(self
, tid
):
116 return GeNlMessage(self
.family_id
, cmd
=TASKSTATS_CMD_GET
,
117 attrs
=[U32Attr(TASKSTATS_CMD_ATTR_PID
, tid
)],
120 def get_single_task_stats(self
, thread
):
121 thread
.task_stats_request
.send(self
.connection
)
123 reply
= self
.connection
.recv()
125 if e
.errno
== errno
.ESRCH
:
126 # OSError: Netlink error: No such process (3)
129 if len(reply
.payload
) < 292:
132 reply_data
= reply
.payload
[20:]
134 reply_length
, reply_type
= struct
.unpack('HH', reply
.payload
[4:8])
135 reply_version
= struct
.unpack('H', reply
.payload
[20:22])[0]
136 assert reply_length
>= 288
137 assert reply_type
== TASKSTATS_CMD_ATTR_PID
+ 3
138 assert reply_version
>= 4
139 return Stats(reply_data
)
145 def find_uids(options
):
146 """Build options.uids from options.users by resolving usernames to UIDs"""
149 for u
in options
.users
or []:
154 passwd
= pwd
.getpwnam(u
)
156 print >> sys
.stderr
, 'Unknown user:', u
161 options
.uids
.append(uid
)
165 def safe_utf8_decode(s
):
167 return s
.decode('utf-8')
168 except UnicodeDecodeError:
169 return s
.encode('string_escape')
171 class ThreadInfo(DumpableObject
):
172 """Stats for a single thread"""
173 def __init__(self
, tid
, taskstats_connection
):
176 self
.stats_total
= None
177 self
.stats_delta
= Stats
.__new
__(Stats
)
178 self
.task_stats_request
= taskstats_connection
.build_request(tid
)
180 def get_ioprio(self
):
181 return ioprio
.get(self
.tid
)
183 def set_ioprio(self
, ioprio_class
, ioprio_data
):
184 return ioprio
.set_ioprio(ioprio
.IOPRIO_WHO_PROCESS
, self
.tid
,
185 ioprio_class
, ioprio_data
)
187 def update_stats(self
, stats
):
188 if not self
.stats_total
:
189 self
.stats_total
= stats
190 stats
.delta(self
.stats_total
, self
.stats_delta
)
191 self
.stats_total
= stats
194 class ProcessInfo(DumpableObject
):
195 """Stats for a single process (a single line in the output): if
196 options.processes is set, it is a collection of threads, otherwise a single
198 def __init__(self
, pid
):
202 self
.threads
= {} # {tid: ThreadInfo}
203 self
.stats_delta
= Stats
.build_all_zero()
204 self
.stats_accum
= Stats
.build_all_zero()
205 self
.stats_accum_timestamp
= time
.time()
207 def is_monitored(self
, options
):
208 if (options
.pids
and not options
.processes
and
209 self
.pid
not in options
.pids
):
210 # We only monitor some threads, not this one
213 if options
.uids
and self
.get_uid() not in options
.uids
:
214 # We only monitor some users, not this one
222 # uid in (None, 0) means either we don't know the UID yet or the process
223 # runs as root so it can change its UID. In both cases it means we have
224 # to find out its current UID.
226 uid
= os
.stat('/proc/%d' % self
.pid
)[stat
.ST_UID
]
228 # The process disappeared
231 # Maybe the process called setuid()
238 if uid
is not None and not self
.user
:
240 self
.user
= safe_utf8_decode(pwd
.getpwuid(uid
).pw_name
)
243 return self
.user
or '{none}'
245 def get_proc_status_name(self
):
247 proc_status
= open('/proc/%d/status' % self
.pid
)
249 return '{no such process}'
250 first_line
= proc_status
.readline()
252 if first_line
.startswith(prefix
):
253 name
= first_line
[6:].strip()
262 def get_cmdline(self
):
263 # A process may exec, so we must always reread its cmdline
265 proc_cmdline
= open('/proc/%d/cmdline' % self
.pid
)
266 cmdline
= proc_cmdline
.read(4096)
268 return '{no such process}'
270 # Probably a kernel thread, get its name from /proc/PID/status
271 return self
.get_proc_status_name()
272 parts
= cmdline
.split('\0')
273 if parts
[0].startswith('/'):
274 first_command_char
= parts
[0].rfind('/') + 1
275 parts
[0] = parts
[0][first_command_char
:]
276 cmdline
= ' '.join(parts
).strip()
277 return safe_utf8_decode(cmdline
)
279 def did_some_io(self
, accumulated
):
281 return not self
.stats_accum
.is_all_zero()
282 return not all(t
.stats_delta
.is_all_zero() for
283 t
in self
.threads
.itervalues())
285 def get_ioprio(self
):
286 priorities
= set(t
.get_ioprio() for t
in self
.threads
.itervalues())
287 if len(priorities
) == 1:
288 return priorities
.pop()
291 def set_ioprio(self
, ioprio_class
, ioprio_data
):
292 for thread
in self
.threads
.itervalues():
293 thread
.set_ioprio(ioprio_class
, ioprio_data
)
295 def ioprio_sort_key(self
):
296 return ioprio
.sort_key(self
.get_ioprio())
298 def get_thread(self
, tid
, taskstats_connection
):
299 thread
= self
.threads
.get(tid
, None)
301 thread
= ThreadInfo(tid
, taskstats_connection
)
302 self
.threads
[tid
] = thread
305 def update_stats(self
):
306 stats_delta
= Stats
.build_all_zero()
307 for tid
, thread
in self
.threads
.items():
309 del self
.threads
[tid
]
311 stats_delta
.accumulate(thread
.stats_delta
, stats_delta
)
313 nr_threads
= len(self
.threads
)
317 stats_delta
.blkio_delay_total
/= nr_threads
318 stats_delta
.swapin_delay_total
/= nr_threads
320 self
.stats_delta
= stats_delta
321 self
.stats_accum
.accumulate(self
.stats_delta
, self
.stats_accum
)
325 class ProcessList(DumpableObject
):
326 def __init__(self
, taskstats_connection
, options
):
329 self
.taskstats_connection
= taskstats_connection
330 self
.options
= options
331 self
.timestamp
= time
.time()
332 self
.vmstat
= vmstat
.VmStat()
334 # A first time as we are interested in the delta
335 self
.update_process_counts()
337 def get_process(self
, pid
):
338 """Either get the specified PID from self.processes or build a new
339 ProcessInfo if we see this PID for the first time"""
340 process
= self
.processes
.get(pid
, None)
342 process
= ProcessInfo(pid
)
343 self
.processes
[pid
] = process
345 if process
.is_monitored(self
.options
):
348 def list_tgids(self
):
349 if self
.options
.pids
:
350 return self
.options
.pids
352 tgids
= os
.listdir('/proc')
353 if self
.options
.processes
:
354 return [int(tgid
) for tgid
in tgids
if '0' <= tgid
[0] <= '9']
358 if '0' <= tgid
[0] <= '9':
360 tids
.extend(map(int, os
.listdir('/proc/' + tgid
+ '/task')))
366 def list_tids(self
, tgid
):
367 if not self
.options
.processes
:
371 tids
= map(int, os
.listdir('/proc/%d/task' % tgid
))
375 if self
.options
.pids
:
376 tids
= list(set(self
.options
.pids
).intersection(set(tids
)))
380 def update_process_counts(self
):
381 new_timestamp
= time
.time()
382 self
.duration
= new_timestamp
- self
.timestamp
383 self
.timestamp
= new_timestamp
385 for tgid
in self
.list_tgids():
386 process
= self
.get_process(tgid
)
389 for tid
in self
.list_tids(tgid
):
390 thread
= process
.get_thread(tid
, self
.taskstats_connection
)
391 stats
= self
.taskstats_connection
.get_single_task_stats(thread
)
393 thread
.update_stats(stats
)
396 return self
.vmstat
.delta()
398 def refresh_processes(self
):
399 for process
in self
.processes
.itervalues():
400 for thread
in process
.threads
.itervalues():
403 total_read_and_write
= self
.update_process_counts()
405 for pid
, process
in self
.processes
.items():
406 if not process
.update_stats():
407 del self
.processes
[pid
]
409 return total_read_and_write