The I/O priority can be dynamically changed, so we must re-fetch it
[iotop.git] / iotop / data.py
blobde127e254ddb4df0ec8de6a65a8c032637becf0a
1 import errno
2 import os
3 import pwd
4 import socket
5 import struct
6 import sys
7 import time
9 from iotop import ioprio
10 from netlink import Connection, NETLINK_GENERIC, U32Attr, NLM_F_REQUEST
11 from genetlink import Controller, GeNlMessage
14 # Check for requirements:
15 # o Python >= 2.5 for AF_NETLINK sockets
16 # o Linux >= 2.6.20 with I/O accounting
18 try:
19 socket.NETLINK_ROUTE
20 python25 = True
21 except AttributeError:
22 python25 = False
24 ioaccounting = os.path.exists('/proc/self/io')
26 if not python25 or not ioaccounting:
27 def boolean2string(boolean):
28 return boolean and 'Found' or 'Not found'
29 print 'Could not run iotop as some of the requirements are not met:'
30 print '- Python >= 2.5 for AF_NETLINK support:', boolean2string(python25)
31 print '- Linux >= 2.6.20 with I/O accounting support ' \
32 '(CONFIG_TASKSTATS, CONFIG_TASK_DELAY_ACCT, ' \
33 'CONFIG_TASK_IO_ACCOUNTING):', \
34 boolean2string(ioaccounting)
35 sys.exit(1)
38 # Interesting fields in a taskstats output
41 class Stats(object):
42 members_offsets = [
43 ('blkio_delay_total', 40),
44 ('swapin_delay_total', 56),
45 ('read_bytes', 248),
46 ('write_bytes', 256),
47 ('cancelled_write_bytes', 264)
50 def __init__(self, task_stats_buffer):
51 for name, offset in Stats.members_offsets:
52 data = task_stats_buffer[offset:offset + 8]
53 setattr(self, name, struct.unpack('Q', data)[0])
55 def accumulate(self, other_stats, operator=sum):
56 delta_stats = Stats.__new__(Stats)
57 for name, offset in Stats.members_offsets:
58 self_value = getattr(self, name)
59 other_value = getattr(other_stats, name)
60 setattr(delta_stats, name, operator((self_value, other_value)))
61 return delta_stats
63 def delta(self, other_stats):
64 def subtract((me, other)):
65 return me - other
66 return self.accumulate(other_stats, operator=subtract)
68 def is_all_zero(self):
69 for name, offset in Stats.members_offsets:
70 if getattr(self, name) != 0:
71 return False
72 return True
74 @staticmethod
75 def build_all_zero():
76 stats = Stats.__new__(Stats)
77 for name, offset in Stats.members_offsets:
78 setattr(stats, name, 0)
79 return stats
82 # Netlink usage for taskstats
85 TASKSTATS_CMD_GET = 1
86 TASKSTATS_CMD_ATTR_PID = 1
88 class TaskStatsNetlink(object):
89 # Keep in sync with human_stats(stats, duration) and pinfo.did_some_io()
91 def __init__(self, options):
92 self.options = options
93 self.connection = Connection(NETLINK_GENERIC)
94 controller = Controller(self.connection)
95 self.family_id = controller.get_family_id('TASKSTATS')
97 def get_single_task_stats(self, pid):
98 request = GeNlMessage(self.family_id, cmd=TASKSTATS_CMD_GET,
99 attrs=[U32Attr(TASKSTATS_CMD_ATTR_PID, pid)],
100 flags=NLM_F_REQUEST)
101 request.send(self.connection)
102 try:
103 reply = self.connection.recv()
104 except OSError, e:
105 if e.errno == errno.ESRCH:
106 # OSError: Netlink error: No such process (3)
107 return
108 raise
109 if len(reply.payload) < 292:
110 # Short reply
111 return
112 reply_data = reply.payload[20:]
114 reply_length, reply_type = struct.unpack('HH', reply.payload[4:8])
115 reply_version = struct.unpack('H', reply.payload[20:22])[0]
116 assert reply_length >= 288
117 assert reply_type == TASKSTATS_CMD_ATTR_PID + 3
118 assert reply_version >= 4
119 return Stats(reply_data)
121 def get_task_stats(self, pid):
122 if self.options.processes:
123 # We don't use TASKSTATS_CMD_ATTR_TGID as it's only half
124 # implemented in the kernel
125 try:
126 pids = map(int, os.listdir('/proc/%d/task' % pid))
127 except OSError:
128 # Pid not found
129 pids = []
130 else:
131 pids = [pid]
133 stats_list = map(self.get_single_task_stats, pids)
134 stats_list = filter(bool, stats_list)
135 if stats_list:
136 res = stats_list[0]
137 for stats in stats_list[1:]:
138 res = res.accumulate(stats)
139 nr_stats = len(stats_list)
140 res.blkio_delay_total /= nr_stats
141 res.swapin_delay_total /= nr_stats
142 return res
145 # PIDs manipulations
148 def find_uids(options):
149 options.uids = []
150 error = False
151 for u in options.users or []:
152 try:
153 uid = int(u)
154 except ValueError:
155 try:
156 passwd = pwd.getpwnam(u)
157 except KeyError:
158 print >> sys.stderr, 'Unknown user:', u
159 error = True
160 else:
161 uid = passwd.pw_uid
162 if not error:
163 options.uids.append(uid)
164 if error:
165 sys.exit(1)
167 def safe_utf8_decode(s):
168 try:
169 return s.decode('utf-8')
170 except UnicodeDecodeError:
171 return s.encode('string_escape')
173 class pinfo(object):
174 def __init__(self, pid, options):
175 self.mark = False
176 self.pid = pid
177 self.stats_total = Stats.build_all_zero()
178 self.stats_delta = Stats.build_all_zero()
179 self.parse_status('/proc/%d/status' % pid, options)
181 def check_if_valid(self, uid, options):
182 self.valid = options.pids or not options.uids or uid in options.uids
184 def parse_status(self, path, options):
185 for line in open(path):
186 if line.startswith('Name:'):
187 # Name kernel threads
188 split = line.split()
189 if len(split) > 1:
190 self.name = '[' + ' '.join(split[1:]).strip() + ']'
191 else:
192 self.name = '(unnamed kernel thread)'
193 elif line.startswith('Uid:'):
194 uid = int(line.split()[1])
195 # We check monitored PIDs only here
196 self.check_if_valid(uid, options)
197 try:
198 self.user = safe_utf8_decode(pwd.getpwuid(uid).pw_name)
199 except KeyError:
200 self.user = str(uid)
201 break
203 def add_stats(self, stats):
204 self.stats_timestamp = time.time()
205 self.stats_delta = stats.delta(self.stats_total)
206 self.stats_total = stats
207 self.ioprio = ioprio.get(self.pid)
209 def get_cmdline(self):
210 # A process may exec, so we must always reread its cmdline
211 try:
212 proc_cmdline = open('/proc/%d/cmdline' % self.pid)
213 cmdline = proc_cmdline.read(4096)
214 except IOError:
215 return '{no such process}'
216 parts = cmdline.split('\0')
217 if parts[0].startswith('/'):
218 first_command_char = parts[0].rfind('/') + 1
219 parts[0] = parts[0][first_command_char:]
220 cmdline = ' '.join(parts).strip()
221 return safe_utf8_decode(cmdline or self.name)
223 def did_some_io(self):
224 return not self.stats_delta.is_all_zero()
226 def ioprio_sort_key(self):
227 return ioprio.sort_key(self.ioprio)
229 class ProcessList(object):
230 def __init__(self, taskstats_connection, options):
231 # {pid: pinfo}
232 self.processes = {}
233 self.taskstats_connection = taskstats_connection
234 self.options = options
235 self.timestamp = time.time()
237 # A first time as we are interested in the delta
238 self.update_process_counts()
240 def get_process(self, pid):
241 process = self.processes.get(pid, None)
242 if not process:
243 try:
244 process = pinfo(pid, self.options)
245 except IOError:
246 # IOError: [Errno 2] No such file or directory: '/proc/...'
247 return
248 if not process.valid:
249 return
250 self.processes[pid] = process
251 return process
253 def list_pids(self, tgid):
254 if self.options.processes or self.options.pids:
255 return [tgid]
256 try:
257 return map(int, os.listdir('/proc/%d/task' % tgid))
258 except OSError:
259 return []
261 def update_process_counts(self):
262 new_timestamp = time.time()
263 self.duration = new_timestamp - self.timestamp
264 self.timestamp = new_timestamp
265 total_read = total_write = 0
266 tgids = self.options.pids or [int(tgid) for tgid in os.listdir('/proc')
267 if '0' <= tgid[0] and tgid[0] <= '9']
268 for tgid in tgids:
269 for pid in self.list_pids(tgid):
270 process = self.get_process(pid)
271 if process:
272 stats = self.taskstats_connection.get_task_stats(pid)
273 if stats:
274 process.mark = False
275 process.add_stats(stats)
276 delta = process.stats_delta
277 total_read += delta.read_bytes
278 total_write += delta.write_bytes
279 return total_read, total_write
281 def refresh_processes(self):
282 for process in self.processes.values():
283 process.mark = True
284 total_read_and_write = self.update_process_counts()
285 for pid, process in self.processes.items():
286 if process.mark:
287 del self.processes[pid]
288 return total_read_and_write