Bumping manifests a=b2g-bump
[gecko.git] / python / psutil / test / _windows.py
blobfebaa232075d923960aa30cf79a559ba2ea142e6
1 #!/usr/bin/env python
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """Windows specific tests. These are implicitly run by test_psutil.py."""
9 import os
10 import unittest
11 import platform
12 import signal
13 import time
14 import sys
15 import subprocess
16 import errno
17 import traceback
19 import psutil
20 import _psutil_mswindows
21 from psutil._compat import PY3, callable, long
22 from test_psutil import *
24 try:
25 import wmi
26 except ImportError:
27 err = sys.exc_info()[1]
28 register_warning("Couldn't run wmi tests: %s" % str(err))
29 wmi = None
30 try:
31 import win32api
32 import win32con
33 except ImportError:
34 err = sys.exc_info()[1]
35 register_warning("Couldn't run pywin32 tests: %s" % str(err))
36 win32api = None
39 class WindowsSpecificTestCase(unittest.TestCase):
41 def setUp(self):
42 sproc = get_test_subprocess()
43 wait_for_pid(sproc.pid)
44 self.pid = sproc.pid
46 def tearDown(self):
47 reap_children()
49 def test_issue_24(self):
50 p = psutil.Process(0)
51 self.assertRaises(psutil.AccessDenied, p.kill)
53 def test_special_pid(self):
54 p = psutil.Process(4)
55 self.assertEqual(p.name, 'System')
56 # use __str__ to access all common Process properties to check
57 # that nothing strange happens
58 str(p)
59 p.username
60 self.assertTrue(p.create_time >= 0.0)
61 try:
62 rss, vms = p.get_memory_info()
63 except psutil.AccessDenied:
64 # expected on Windows Vista and Windows 7
65 if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
66 raise
67 else:
68 self.assertTrue(rss > 0)
70 def test_signal(self):
71 p = psutil.Process(self.pid)
72 self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
74 def test_nic_names(self):
75 p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
76 out = p.communicate()[0]
77 if PY3:
78 out = str(out, sys.stdout.encoding)
79 nics = psutil.net_io_counters(pernic=True).keys()
80 for nic in nics:
81 if "pseudo-interface" in nic.replace(' ', '-').lower():
82 continue
83 if nic not in out:
84 self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)
86 def test_exe(self):
87 for p in psutil.process_iter():
88 try:
89 self.assertEqual(os.path.basename(p.exe), p.name)
90 except psutil.Error:
91 pass
93 if wmi is not None:
95 # --- Process class tests
97 def test_process_name(self):
98 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
99 p = psutil.Process(self.pid)
100 self.assertEqual(p.name, w.Caption)
102 def test_process_exe(self):
103 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
104 p = psutil.Process(self.pid)
105 self.assertEqual(p.exe, w.ExecutablePath)
107 def test_process_cmdline(self):
108 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
109 p = psutil.Process(self.pid)
110 self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))
112 def test_process_username(self):
113 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
114 p = psutil.Process(self.pid)
115 domain, _, username = w.GetOwner()
116 username = "%s\\%s" %(domain, username)
117 self.assertEqual(p.username, username)
119 def test_process_rss_memory(self):
120 time.sleep(0.1)
121 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
122 p = psutil.Process(self.pid)
123 rss = p.get_memory_info().rss
124 self.assertEqual(rss, int(w.WorkingSetSize))
126 def test_process_vms_memory(self):
127 time.sleep(0.1)
128 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
129 p = psutil.Process(self.pid)
130 vms = p.get_memory_info().vms
131 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
132 # ...claims that PageFileUsage is represented in Kilo
133 # bytes but funnily enough on certain platforms bytes are
134 # returned instead.
135 wmi_usage = int(w.PageFileUsage)
136 if (vms != wmi_usage) and (vms != wmi_usage * 1024):
137 self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
139 def test_process_create_time(self):
140 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
141 p = psutil.Process(self.pid)
142 wmic_create = str(w.CreationDate.split('.')[0])
143 psutil_create = time.strftime("%Y%m%d%H%M%S",
144 time.localtime(p.create_time))
145 self.assertEqual(wmic_create, psutil_create)
148 # --- psutil namespace functions and constants tests
150 @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
151 'NUMBER_OF_PROCESSORS env var is not available')
152 def test_NUM_CPUS(self):
153 num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
154 self.assertEqual(num_cpus, psutil.NUM_CPUS)
156 def test_TOTAL_PHYMEM(self):
157 w = wmi.WMI().Win32_ComputerSystem()[0]
158 self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)
160 def test__UPTIME(self):
161 # _UPTIME constant is not public but it is used internally
162 # as value to return for pid 0 creation time.
163 # WMI behaves the same.
164 w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
165 p = psutil.Process(0)
166 wmic_create = str(w.CreationDate.split('.')[0])
167 psutil_create = time.strftime("%Y%m%d%H%M%S",
168 time.localtime(p.create_time))
169 # XXX - ? no actual test here
171 def test_get_pids(self):
172 # Note: this test might fail if the OS is starting/killing
173 # other processes in the meantime
174 w = wmi.WMI().Win32_Process()
175 wmi_pids = [x.ProcessId for x in w]
176 wmi_pids.sort()
177 psutil_pids = psutil.get_pid_list()
178 psutil_pids.sort()
179 if wmi_pids != psutil_pids:
180 difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
181 filter(lambda x:x not in psutil_pids, wmi_pids)
182 self.fail("difference: " + str(difference))
184 def test_disks(self):
185 ps_parts = psutil.disk_partitions(all=True)
186 wmi_parts = wmi.WMI().Win32_LogicalDisk()
187 for ps_part in ps_parts:
188 for wmi_part in wmi_parts:
189 if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
190 if not ps_part.mountpoint:
191 # this is usually a CD-ROM with no disk inserted
192 break
193 try:
194 usage = psutil.disk_usage(ps_part.mountpoint)
195 except OSError:
196 err = sys.exc_info()[1]
197 if err.errno == errno.ENOENT:
198 # usually this is the floppy
199 break
200 else:
201 raise
202 self.assertEqual(usage.total, int(wmi_part.Size))
203 wmi_free = int(wmi_part.FreeSpace)
204 self.assertEqual(usage.free, wmi_free)
205 # 10 MB tollerance
206 if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
207 self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
208 break
209 else:
210 self.fail("can't find partition %s" % repr(ps_part))
212 if win32api is not None:
214 def test_get_num_handles(self):
215 p = psutil.Process(os.getpid())
216 before = p.get_num_handles()
217 handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
218 win32con.FALSE, os.getpid())
219 after = p.get_num_handles()
220 self.assertEqual(after, before+1)
221 win32api.CloseHandle(handle)
222 self.assertEqual(p.get_num_handles(), before)
224 def test_get_num_handles_2(self):
225 # Note: this fails from time to time; I'm keen on thinking
226 # it doesn't mean something is broken
227 def call(p, attr):
228 attr = getattr(p, name, None)
229 if attr is not None and callable(attr):
230 ret = attr()
231 else:
232 ret = attr
234 p = psutil.Process(self.pid)
235 attrs = []
236 failures = []
237 for name in dir(psutil.Process):
238 if name.startswith('_') \
239 or name.startswith('set_') \
240 or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
241 'send_signal', 'wait', 'get_children', 'as_dict'):
242 continue
243 else:
244 try:
245 call(p, name)
246 num1 = p.get_num_handles()
247 call(p, name)
248 num2 = p.get_num_handles()
249 except (psutil.NoSuchProcess, psutil.AccessDenied):
250 pass
251 else:
252 if num2 > num1:
253 fail = "failure while processing Process.%s method " \
254 "(before=%s, after=%s)" % (name, num1, num2)
255 failures.append(fail)
256 if failures:
257 self.fail('\n' + '\n'.join(failures))
260 import _psutil_mswindows
261 from psutil._psmswindows import ACCESS_DENIED_SET
263 def wrap_exceptions(callable):
264 def wrapper(self, *args, **kwargs):
265 try:
266 return callable(self, *args, **kwargs)
267 except OSError:
268 err = sys.exc_info()[1]
269 if err.errno in ACCESS_DENIED_SET:
270 raise psutil.AccessDenied(None, None)
271 if err.errno == errno.ESRCH:
272 raise psutil.NoSuchProcess(None, None)
273 raise
274 return wrapper
276 class TestDualProcessImplementation(unittest.TestCase):
277 fun_names = [
278 # function name tolerance
279 ('get_process_cpu_times', 0.2),
280 ('get_process_create_time', 0.5),
281 ('get_process_num_handles', 1), # 1 because impl #1 opens a handle
282 ('get_process_io_counters', 0),
283 ('get_process_memory_info', 1024), # KB
286 def test_compare_values(self):
287 # Certain APIs on Windows have 2 internal implementations, one
288 # based on documented Windows APIs, another one based
289 # NtQuerySystemInformation() which gets called as fallback in
290 # case the first fails because of limited permission error.
291 # Here we test that the two methods return the exact same value,
292 # see:
293 # http://code.google.com/p/psutil/issues/detail?id=304
294 def assert_ge_0(obj):
295 if isinstance(obj, tuple):
296 for value in obj:
297 self.assertGreaterEqual(value, 0)
298 elif isinstance(obj, (int, long, float)):
299 self.assertGreaterEqual(obj, 0)
300 else:
301 assert 0 # case not handled which needs to be fixed
303 def compare_with_tolerance(ret1, ret2, tolerance):
304 if ret1 == ret2:
305 return
306 else:
307 if isinstance(ret2, (int, long, float)):
308 diff = abs(ret1 - ret2)
309 self.assertLessEqual(diff, tolerance)
310 elif isinstance(ret2, tuple):
311 for a, b in zip(ret1, ret2):
312 diff = abs(a - b)
313 self.assertLessEqual(diff, tolerance)
315 failures = []
316 for name, tolerance in self.fun_names:
317 meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
318 meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
319 for p in psutil.process_iter():
320 if name == 'get_process_memory_info' and p.pid == os.getpid():
321 continue
323 try:
324 ret1 = meth1(p.pid)
325 except psutil.NoSuchProcess:
326 continue
327 except psutil.AccessDenied:
328 ret1 = None
330 try:
331 ret2 = meth2(p.pid)
332 except psutil.NoSuchProcess:
333 # this is supposed to fail only in case of zombie process
334 # never for permission error
335 continue
337 # compare values
338 try:
339 if ret1 is None:
340 assert_ge_0(ret2)
341 else:
342 compare_with_tolerance(ret1, ret2, tolerance)
343 assert_ge_0(ret1)
344 assert_ge_0(ret2)
345 except AssertionError:
346 err = sys.exc_info()[1]
347 trace = traceback.format_exc()
348 msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
349 % (trace, p.pid, name, ret1, ret2)
350 failures.append(msg)
351 break
352 if failures:
353 self.fail('\n\n'.join(failures))
355 def test_zombies(self):
356 # test that NPS is raised by the 2nd implementation in case a
357 # process no longer exists
358 ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
359 for name, _ in self.fun_names:
360 meth = wrap_exceptions(getattr(_psutil_mswindows, name))
361 self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
364 def test_main():
365 test_suite = unittest.TestSuite()
366 test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
367 test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
368 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
369 return result.wasSuccessful()
371 if __name__ == '__main__':
372 if not test_main():
373 sys.exit(1)