Document the new python requirements
[iotop.git] / iotop / data.py
blobbbe3e277276da46ae1d9a694e4c1ae2741fba320
1 import errno
2 import glob
3 import os
4 import pprint
5 import pwd
6 import socket
7 import stat
8 import struct
9 import sys
10 import time
13 # Check for requirements:
14 # o Linux >= 2.6.20 with I/O accounting
15 # o Python >= 2.5 or Python 2.4 + ctypes
18 ioaccounting = os.path.exists('/proc/self/io')
19 try:
20 import ctypes
21 except ImportError:
22 has_ctypes = False
23 else:
24 has_ctypes = True
26 if not ioaccounting or not has_ctypes:
27 def boolean2string(boolean):
28 return boolean and 'Found' or 'Not found'
29 print 'Could not run iotop as some of the requirements are not met:'
30 print '- Linux >= 2.6.20 with I/O accounting support ' \
31 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
32 'CONFIG_TASK_IO_ACCOUNTING):', boolean2string(ioaccounting)
33 print '- Python >= 2.5 or Python 2.4 with the ctypes module:', \
34 boolean2string(has_ctypes)
36 sys.exit(1)
38 from iotop import ioprio, vmstat
39 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
40 from genetlink import Controller, GeNlMessage
42 class DumpableObject(object):
43 """Base class for all objects that allows easy introspection when printed"""
44 def __repr__(self):
45 return '%s: %s>' % (str(type(self))[:-1], pprint.pformat(self.__dict__))
49 # Interesting fields in a taskstats output
52 class Stats(DumpableObject):
53 members_offsets = [
54 ('blkio_delay_total', 40),
55 ('swapin_delay_total', 56),
56 ('read_bytes', 248),
57 ('write_bytes', 256),
58 ('cancelled_write_bytes', 264)
61 has_blkio_delay_total = False
63 def __init__(self, task_stats_buffer):
64 sd = self.__dict__
65 for name, offset in Stats.members_offsets:
66 data = task_stats_buffer[offset:offset + 8]
67 sd[name] = struct.unpack('Q', data)[0]
69 # This is a heuristic to detect if CONFIG_TASK_DELAY_ACCT is enabled in
70 # the kernel.
71 if not Stats.has_blkio_delay_total:
72 Stats.has_blkio_delay_total = self.blkio_delay_total != 0
74 def accumulate(self, other_stats, destination, coeff=1):
75 """Update destination from operator(self, other_stats)"""
76 dd = destination.__dict__
77 sd = self.__dict__
78 od = other_stats.__dict__
79 for member, offset in Stats.members_offsets:
80 dd[member] = sd[member] + coeff * od[member]
82 def delta(self, other_stats, destination):
83 """Update destination with self - other_stats"""
84 return self.accumulate(other_stats, destination, coeff=-1)
86 def is_all_zero(self):
87 sd = self.__dict__
88 for name, offset in Stats.members_offsets:
89 if sd[name] != 0:
90 return False
91 return True
93 @staticmethod
94 def build_all_zero():
95 stats = Stats.__new__(Stats)
96 std = stats.__dict__
97 for name, offset in Stats.members_offsets:
98 std[name] = 0
99 return stats
102 # Netlink usage for taskstats
105 TASKSTATS_CMD_GET = 1
106 TASKSTATS_CMD_ATTR_PID = 1
108 class TaskStatsNetlink(object):
109 # Keep in sync with format_stats() and pinfo.did_some_io()
111 def __init__(self, options):
112 self.options = options
113 self.connection = Connection(NETLINK_GENERIC)
114 controller = Controller(self.connection)
115 self.family_id = controller.get_family_id('TASKSTATS')
117 def build_request(self, tid):
118 return GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
119 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, tid)],
120 flags=NLM_F_REQUEST)
122 def get_single_task_stats(self, thread):
123 thread.task_stats_request.send(self.connection)
124 try:
125 reply = self.connection.recv()
126 except OSError, e:
127 if e.errno == errno.ESRCH:
128 # OSError: Netlink error: No such process (3)
129 return
130 raise
131 if len(reply.payload) < 292:
132 # Short reply
133 return
134 reply_data = reply.payload[20:]
136 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
137 reply_version = struct.unpack('H', reply.payload[20:22])[0]
138 assert reply_length >= 288
139 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
140 assert reply_version >= 4
141 return Stats(reply_data)
144 # PIDs manipulations
147 def find_uids(options):
148 """Build options.uids from options.users by resolving usernames to UIDs"""
149 options.uids = []
150 error = False
151 for u in options.users or []:
152 try:
153 uid = int(u)
154 except ValueError:
155 try:
156 passwd = pwd.getpwnam(u)
157 except KeyError:
158 print >> sys.stderr, 'Unknown user:', u
159 error = True
160 else:
161 uid = passwd.pw_uid
162 if not error:
163 options.uids.append(uid)
164 if error:
165 sys.exit(1)
167 def safe_utf8_decode(s):
168 try:
169 return s.decode('utf-8')
170 except UnicodeDecodeError:
171 return s.encode('string_escape')
173 class ThreadInfo(DumpableObject):
174 """Stats for a single thread"""
175 def __init__(self, tid, taskstats_connection):
176 self.tid = tid
177 self.mark = True
178 self.stats_total = None
179 self.stats_delta = Stats.__new__(Stats)
180 self.task_stats_request = taskstats_connection.build_request(tid)
182 def get_ioprio(self):
183 return ioprio.get(self.tid)
185 def set_ioprio(self, ioprio_class, ioprio_data):
186 return ioprio.set_ioprio(ioprio.IOPRIO_WHO_PROCESS, self.tid,
187 ioprio_class, ioprio_data)
189 def update_stats(self, stats):
190 if not self.stats_total:
191 self.stats_total = stats
192 stats.delta(self.stats_total, self.stats_delta)
193 self.stats_total = stats
196 class ProcessInfo(DumpableObject):
197 """Stats for a single process (a single line in the output): if
198 options.processes is set, it is a collection of threads, otherwise a single
199 thread."""
200 def __init__(self, pid):
201 self.pid = pid
202 self.uid = None
203 self.user = None
204 self.threads = {} # {tid: ThreadInfo}
205 self.stats_delta = Stats.build_all_zero()
206 self.stats_accum = Stats.build_all_zero()
207 self.stats_accum_timestamp = time.time()
209 def is_monitored(self, options):
210 if (options.pids and not options.processes and
211 self.pid not in options.pids):
212 # We only monitor some threads, not this one
213 return False
215 if options.uids and self.get_uid() not in options.uids:
216 # We only monitor some users, not this one
217 return False
219 return True
221 def get_uid(self):
222 if self.uid:
223 return self.uid
224 # uid in (None, 0) means either we don't know the UID yet or the process
225 # runs as root so it can change its UID. In both cases it means we have
226 # to find out its current UID.
227 try:
228 uid = os.stat('/proc/%d' % self.pid)[stat.ST_UID]
229 except OSError:
230 # The process disappeared
231 uid = None
232 if uid != self.uid:
233 # Maybe the process called setuid()
234 self.user = None
235 self.uid = uid
236 return uid
238 def get_user(self):
239 uid = self.get_uid()
240 if uid is not None and not self.user:
241 try:
242 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
243 except KeyError:
244 self.user = str(uid)
245 return self.user or '{none}'
247 def get_proc_status_name(self):
248 try:
249 first_line = open('/proc/%d/status' % self.pid).readline()
250 except IOError:
251 return '{no such process}'
252 prefix = 'Name:\t'
253 if first_line.startswith(prefix):
254 name = first_line[6:].strip()
255 else:
256 name = ''
257 if name:
258 name = '[%s]' % name
259 else:
260 name = '{no name}'
261 return name
263 def get_cmdline(self):
264 # A process may exec, so we must always reread its cmdline
265 try:
266 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
267 cmdline = proc_cmdline.read(4096)
268 except IOError:
269 return '{no such process}'
270 if not cmdline:
271 # Probably a kernel thread, get its name from /proc/PID/status
272 return self.get_proc_status_name()
273 parts = cmdline.split('\0')
274 if parts[0].startswith('/'):
275 first_command_char = parts[0].rfind('/') + 1
276 parts[0] = parts[0][first_command_char:]
277 cmdline = ' '.join(parts).strip()
278 return safe_utf8_decode(cmdline)
280 def did_some_io(self, accumulated):
281 if accumulated:
282 return not self.stats_accum.is_all_zero()
283 for t in self.threads.itervalues():
284 if not t.stats_delta.is_all_zero():
285 return True
286 return False
288 def get_ioprio(self):
289 priorities = set(t.get_ioprio() for t in self.threads.itervalues())
290 if len(priorities) == 1:
291 return priorities.pop()
292 return '?dif'
294 def set_ioprio(self, ioprio_class, ioprio_data):
295 for thread in self.threads.itervalues():
296 thread.set_ioprio(ioprio_class, ioprio_data)
298 def ioprio_sort_key(self):
299 return ioprio.sort_key(self.get_ioprio())
301 def get_thread(self, tid, taskstats_connection):
302 thread = self.threads.get(tid, None)
303 if not thread:
304 thread = ThreadInfo(tid, taskstats_connection)
305 self.threads[tid] = thread
306 return thread
308 def update_stats(self):
309 stats_delta = Stats.build_all_zero()
310 for tid, thread in self.threads.items():
311 if thread.mark:
312 del self.threads[tid]
313 else:
314 stats_delta.accumulate(thread.stats_delta, stats_delta)
316 nr_threads = len(self.threads)
317 if not nr_threads:
318 return False
320 stats_delta.blkio_delay_total /= nr_threads
321 stats_delta.swapin_delay_total /= nr_threads
323 self.stats_delta = stats_delta
324 self.stats_accum.accumulate(self.stats_delta, self.stats_accum)
326 return True
328 class ProcessList(DumpableObject):
329 def __init__(self, taskstats_connection, options):
330 # {pid: ProcessInfo}
331 self.processes = {}
332 self.taskstats_connection = taskstats_connection
333 self.options = options
334 self.timestamp = time.time()
335 self.vmstat = vmstat.VmStat()
337 # A first time as we are interested in the delta
338 self.update_process_counts()
340 def get_process(self, pid):
341 """Either get the specified PID from self.processes or build a new
342 ProcessInfo if we see this PID for the first time"""
343 process = self.processes.get(pid, None)
344 if not process:
345 process = ProcessInfo(pid)
346 self.processes[pid] = process
348 if process.is_monitored(self.options):
349 return process
351 def list_tgids(self):
352 if self.options.pids:
353 return self.options.pids
355 tgids = os.listdir('/proc')
356 if self.options.processes:
357 return [int(tgid) for tgid in tgids if '0' <= tgid[0] <= '9']
359 tids = []
360 for tgid in tgids:
361 if '0' <= tgid[0] <= '9':
362 try:
363 tids.extend(map(int, os.listdir('/proc/' + tgid + '/task')))
364 except OSError:
365 # The PID went away
366 pass
367 return tids
369 def list_tids(self, tgid):
370 if not self.options.processes:
371 return [tgid]
373 try:
374 tids = map(int, os.listdir('/proc/%d/task' % tgid))
375 except OSError:
376 return []
378 if self.options.pids:
379 tids = list(set(self.options.pids).intersection(set(tids)))
381 return tids
383 def update_process_counts(self):
384 new_timestamp = time.time()
385 self.duration = new_timestamp - self.timestamp
386 self.timestamp = new_timestamp
388 for tgid in self.list_tgids():
389 process = self.get_process(tgid)
390 if not process:
391 continue
392 for tid in self.list_tids(tgid):
393 thread = process.get_thread(tid, self.taskstats_connection)
394 stats = self.taskstats_connection.get_single_task_stats(thread)
395 if stats:
396 thread.update_stats(stats)
397 thread.mark = False
399 return self.vmstat.delta()
401 def refresh_processes(self):
402 for process in self.processes.itervalues():
403 for thread in process.threads.itervalues():
404 thread.mark = True
406 total_read_and_write = self.update_process_counts()
408 for pid, process in self.processes.items():
409 if not process.update_stats():
410 del self.processes[pid]
412 return total_read_and_write
414 def clear(self):
415 self.processes = {}