3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """Windows specific tests. These are implicitly run by test_psutil.py."""
20 import _psutil_mswindows
21 from psutil
._compat
import PY3
, callable, long
22 from test_psutil
import *
27 err
= sys
.exc_info()[1]
28 register_warning("Couldn't run wmi tests: %s" % str(err
))
34 err
= sys
.exc_info()[1]
35 register_warning("Couldn't run pywin32 tests: %s" % str(err
))
39 class WindowsSpecificTestCase(unittest
.TestCase
):
42 sproc
= get_test_subprocess()
43 wait_for_pid(sproc
.pid
)
49 def test_issue_24(self
):
51 self
.assertRaises(psutil
.AccessDenied
, p
.kill
)
53 def test_special_pid(self
):
55 self
.assertEqual(p
.name
, 'System')
56 # use __str__ to access all common Process properties to check
57 # that nothing strange happens
60 self
.assertTrue(p
.create_time
>= 0.0)
62 rss
, vms
= p
.get_memory_info()
63 except psutil
.AccessDenied
:
64 # expected on Windows Vista and Windows 7
65 if not platform
.uname()[1] in ('vista', 'win-7', 'win7'):
68 self
.assertTrue(rss
> 0)
70 def test_signal(self
):
71 p
= psutil
.Process(self
.pid
)
72 self
.assertRaises(ValueError, p
.send_signal
, signal
.SIGINT
)
74 def test_nic_names(self
):
75 p
= subprocess
.Popen(['ipconfig', '/all'], stdout
=subprocess
.PIPE
)
76 out
= p
.communicate()[0]
78 out
= str(out
, sys
.stdout
.encoding
)
79 nics
= psutil
.net_io_counters(pernic
=True).keys()
81 if "pseudo-interface" in nic
.replace(' ', '-').lower():
84 self
.fail("%r nic wasn't found in 'ipconfig /all' output" % nic
)
87 for p
in psutil
.process_iter():
89 self
.assertEqual(os
.path
.basename(p
.exe
), p
.name
)
95 # --- Process class tests
97 def test_process_name(self
):
98 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
99 p
= psutil
.Process(self
.pid
)
100 self
.assertEqual(p
.name
, w
.Caption
)
102 def test_process_exe(self
):
103 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
104 p
= psutil
.Process(self
.pid
)
105 self
.assertEqual(p
.exe
, w
.ExecutablePath
)
107 def test_process_cmdline(self
):
108 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
109 p
= psutil
.Process(self
.pid
)
110 self
.assertEqual(' '.join(p
.cmdline
), w
.CommandLine
.replace('"', ''))
112 def test_process_username(self
):
113 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
114 p
= psutil
.Process(self
.pid
)
115 domain
, _
, username
= w
.GetOwner()
116 username
= "%s\\%s" %(domain
, username
)
117 self
.assertEqual(p
.username
, username
)
119 def test_process_rss_memory(self
):
121 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
122 p
= psutil
.Process(self
.pid
)
123 rss
= p
.get_memory_info().rss
124 self
.assertEqual(rss
, int(w
.WorkingSetSize
))
126 def test_process_vms_memory(self
):
128 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
129 p
= psutil
.Process(self
.pid
)
130 vms
= p
.get_memory_info().vms
131 # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
132 # ...claims that PageFileUsage is represented in Kilo
133 # bytes but funnily enough on certain platforms bytes are
135 wmi_usage
= int(w
.PageFileUsage
)
136 if (vms
!= wmi_usage
) and (vms
!= wmi_usage
* 1024):
137 self
.fail("wmi=%s, psutil=%s" % (wmi_usage
, vms
))
139 def test_process_create_time(self
):
140 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
141 p
= psutil
.Process(self
.pid
)
142 wmic_create
= str(w
.CreationDate
.split('.')[0])
143 psutil_create
= time
.strftime("%Y%m%d%H%M%S",
144 time
.localtime(p
.create_time
))
145 self
.assertEqual(wmic_create
, psutil_create
)
148 # --- psutil namespace functions and constants tests
150 @unittest.skipUnless(hasattr(os
, 'NUMBER_OF_PROCESSORS'),
151 'NUMBER_OF_PROCESSORS env var is not available')
152 def test_NUM_CPUS(self
):
153 num_cpus
= int(os
.environ
['NUMBER_OF_PROCESSORS'])
154 self
.assertEqual(num_cpus
, psutil
.NUM_CPUS
)
156 def test_TOTAL_PHYMEM(self
):
157 w
= wmi
.WMI().Win32_ComputerSystem()[0]
158 self
.assertEqual(int(w
.TotalPhysicalMemory
), psutil
.TOTAL_PHYMEM
)
160 def test__UPTIME(self
):
161 # _UPTIME constant is not public but it is used internally
162 # as value to return for pid 0 creation time.
163 # WMI behaves the same.
164 w
= wmi
.WMI().Win32_Process(ProcessId
=self
.pid
)[0]
165 p
= psutil
.Process(0)
166 wmic_create
= str(w
.CreationDate
.split('.')[0])
167 psutil_create
= time
.strftime("%Y%m%d%H%M%S",
168 time
.localtime(p
.create_time
))
169 # XXX - ? no actual test here
171 def test_get_pids(self
):
172 # Note: this test might fail if the OS is starting/killing
173 # other processes in the meantime
174 w
= wmi
.WMI().Win32_Process()
175 wmi_pids
= [x
.ProcessId
for x
in w
]
177 psutil_pids
= psutil
.get_pid_list()
179 if wmi_pids
!= psutil_pids
:
180 difference
= filter(lambda x
:x
not in wmi_pids
, psutil_pids
) + \
181 filter(lambda x
:x
not in psutil_pids
, wmi_pids
)
182 self
.fail("difference: " + str(difference
))
184 def test_disks(self
):
185 ps_parts
= psutil
.disk_partitions(all
=True)
186 wmi_parts
= wmi
.WMI().Win32_LogicalDisk()
187 for ps_part
in ps_parts
:
188 for wmi_part
in wmi_parts
:
189 if ps_part
.device
.replace('\\', '') == wmi_part
.DeviceID
:
190 if not ps_part
.mountpoint
:
191 # this is usually a CD-ROM with no disk inserted
194 usage
= psutil
.disk_usage(ps_part
.mountpoint
)
196 err
= sys
.exc_info()[1]
197 if err
.errno
== errno
.ENOENT
:
198 # usually this is the floppy
202 self
.assertEqual(usage
.total
, int(wmi_part
.Size
))
203 wmi_free
= int(wmi_part
.FreeSpace
)
204 self
.assertEqual(usage
.free
, wmi_free
)
206 if abs(usage
.free
- wmi_free
) > 10 * 1024 * 1024:
207 self
.fail("psutil=%s, wmi=%s" % usage
.free
, wmi_free
)
210 self
.fail("can't find partition %s" % repr(ps_part
))
212 if win32api
is not None:
214 def test_get_num_handles(self
):
215 p
= psutil
.Process(os
.getpid())
216 before
= p
.get_num_handles()
217 handle
= win32api
.OpenProcess(win32con
.PROCESS_QUERY_INFORMATION
,
218 win32con
.FALSE
, os
.getpid())
219 after
= p
.get_num_handles()
220 self
.assertEqual(after
, before
+1)
221 win32api
.CloseHandle(handle
)
222 self
.assertEqual(p
.get_num_handles(), before
)
224 def test_get_num_handles_2(self
):
225 # Note: this fails from time to time; I'm keen on thinking
226 # it doesn't mean something is broken
228 attr
= getattr(p
, name
, None)
229 if attr
is not None and callable(attr
):
234 p
= psutil
.Process(self
.pid
)
237 for name
in dir(psutil
.Process
):
238 if name
.startswith('_') \
239 or name
.startswith('set_') \
240 or name
in ('terminate', 'kill', 'suspend', 'resume', 'nice',
241 'send_signal', 'wait', 'get_children', 'as_dict'):
246 num1
= p
.get_num_handles()
248 num2
= p
.get_num_handles()
249 except (psutil
.NoSuchProcess
, psutil
.AccessDenied
):
253 fail
= "failure while processing Process.%s method " \
254 "(before=%s, after=%s)" % (name
, num1
, num2
)
255 failures
.append(fail
)
257 self
.fail('\n' + '\n'.join(failures
))
260 import _psutil_mswindows
261 from psutil
._psmswindows
import ACCESS_DENIED_SET
263 def wrap_exceptions(callable):
264 def wrapper(self
, *args
, **kwargs
):
266 return callable(self
, *args
, **kwargs
)
268 err
= sys
.exc_info()[1]
269 if err
.errno
in ACCESS_DENIED_SET
:
270 raise psutil
.AccessDenied(None, None)
271 if err
.errno
== errno
.ESRCH
:
272 raise psutil
.NoSuchProcess(None, None)
276 class TestDualProcessImplementation(unittest
.TestCase
):
278 # function name tolerance
279 ('get_process_cpu_times', 0.2),
280 ('get_process_create_time', 0.5),
281 ('get_process_num_handles', 1), # 1 because impl #1 opens a handle
282 ('get_process_io_counters', 0),
283 ('get_process_memory_info', 1024), # KB
286 def test_compare_values(self
):
287 # Certain APIs on Windows have 2 internal implementations, one
288 # based on documented Windows APIs, another one based
289 # NtQuerySystemInformation() which gets called as fallback in
290 # case the first fails because of limited permission error.
291 # Here we test that the two methods return the exact same value,
293 # http://code.google.com/p/psutil/issues/detail?id=304
294 def assert_ge_0(obj
):
295 if isinstance(obj
, tuple):
297 self
.assertGreaterEqual(value
, 0)
298 elif isinstance(obj
, (int, long, float)):
299 self
.assertGreaterEqual(obj
, 0)
301 assert 0 # case not handled which needs to be fixed
303 def compare_with_tolerance(ret1
, ret2
, tolerance
):
307 if isinstance(ret2
, (int, long, float)):
308 diff
= abs(ret1
- ret2
)
309 self
.assertLessEqual(diff
, tolerance
)
310 elif isinstance(ret2
, tuple):
311 for a
, b
in zip(ret1
, ret2
):
313 self
.assertLessEqual(diff
, tolerance
)
316 for name
, tolerance
in self
.fun_names
:
317 meth1
= wrap_exceptions(getattr(_psutil_mswindows
, name
))
318 meth2
= wrap_exceptions(getattr(_psutil_mswindows
, name
+ '_2'))
319 for p
in psutil
.process_iter():
320 if name
== 'get_process_memory_info' and p
.pid
== os
.getpid():
325 except psutil
.NoSuchProcess
:
327 except psutil
.AccessDenied
:
332 except psutil
.NoSuchProcess
:
333 # this is supposed to fail only in case of zombie process
334 # never for permission error
342 compare_with_tolerance(ret1
, ret2
, tolerance
)
345 except AssertionError:
346 err
= sys
.exc_info()[1]
347 trace
= traceback
.format_exc()
348 msg
= '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
349 % (trace
, p
.pid
, name
, ret1
, ret2
)
353 self
.fail('\n\n'.join(failures
))
355 def test_zombies(self
):
356 # test that NPS is raised by the 2nd implementation in case a
357 # process no longer exists
358 ZOMBIE_PID
= max(psutil
.get_pid_list()) + 5000
359 for name
, _
in self
.fun_names
:
360 meth
= wrap_exceptions(getattr(_psutil_mswindows
, name
))
361 self
.assertRaises(psutil
.NoSuchProcess
, meth
, ZOMBIE_PID
)
365 test_suite
= unittest
.TestSuite()
366 test_suite
.addTest(unittest
.makeSuite(WindowsSpecificTestCase
))
367 test_suite
.addTest(unittest
.makeSuite(TestDualProcessImplementation
))
368 result
= unittest
.TextTestRunner(verbosity
=2).run(test_suite
)
369 return result
.wasSuccessful()
371 if __name__
== '__main__':