Bumping manifests a=b2g-bump
[gecko.git] / python / psutil / test / test_psutil.py
blobe0179ebf65968f6ff909cdfe9628168b3291aafc
1 #!/usr/bin/env python
3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
7 """
8 psutil test suite (you can quickly run it with "python setup.py test").
10 Note: this is targeted for both python 2.x and 3.x so there's no need
11 to use 2to3 tool first.
13 If you're on Python < 2.7 it is recommended to install unittest2 module
14 from: https://pypi.python.org/pypi/unittest2
15 """
17 from __future__ import division
18 import os
19 import sys
20 import subprocess
21 import time
22 import signal
23 import types
24 import traceback
25 import socket
26 import warnings
27 import atexit
28 import errno
29 import threading
30 import tempfile
31 import stat
32 import collections
33 import datetime
34 try:
35 import unittest2 as unittest # pyhon < 2.7 + unittest2 installed
36 except ImportError:
37 import unittest
38 try:
39 import ast # python >= 2.6
40 except ImportError:
41 ast = None
43 import psutil
44 from psutil._compat import PY3, callable, long, wraps
47 # ===================================================================
48 # --- Constants
49 # ===================================================================
51 # conf for retry_before_failing() decorator
52 NO_RETRIES = 10
53 # bytes tolerance for OS memory related tests
54 TOLERANCE = 500 * 1024 # 500KB
56 PYTHON = os.path.realpath(sys.executable)
57 DEVNULL = open(os.devnull, 'r+')
58 TESTFN = os.path.join(os.getcwd(), "$testfile")
59 EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(
60 os.path.dirname(__file__)), 'examples'))
61 POSIX = os.name == 'posix'
62 LINUX = sys.platform.startswith("linux")
63 WINDOWS = sys.platform.startswith("win32")
64 OSX = sys.platform.startswith("darwin")
65 BSD = sys.platform.startswith("freebsd")
66 SUNOS = sys.platform.startswith("sunos")
69 # ===================================================================
70 # --- Utility functions
71 # ===================================================================
73 _subprocesses_started = set()
75 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=DEVNULL,
76 wait=False):
77 """Return a subprocess.Popen object to use in tests.
78 By default stdout and stderr are redirected to /dev/null and the
79 python interpreter is used as test process.
80 If 'wait' is True attemps to make sure the process is in a
81 reasonably initialized state.
82 """
83 if cmd is None:
84 pyline = ""
85 if wait:
86 pyline += "open(r'%s', 'w'); " % TESTFN
87 pyline += "import time; time.sleep(2);"
88 cmd_ = [PYTHON, "-c", pyline]
89 else:
90 cmd_ = cmd
91 sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
92 if wait:
93 if cmd is None:
94 stop_at = time.time() + 3
95 while stop_at > time.time():
96 if os.path.exists(TESTFN):
97 break
98 time.sleep(0.001)
99 else:
100 warn("couldn't make sure test file was actually created")
101 else:
102 wait_for_pid(sproc.pid)
103 _subprocesses_started.add(sproc.pid)
104 return sproc
106 def warn(msg):
107 """Raise a warning msg."""
108 warnings.warn(msg, UserWarning)
110 def register_warning(msg):
111 """Register a warning which will be printed on interpreter exit."""
112 atexit.register(lambda: warn(msg))
114 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
115 """run cmd in a subprocess and return its output.
116 raises RuntimeError on error.
118 p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
119 stdout, stderr = p.communicate()
120 if p.returncode != 0:
121 raise RuntimeError(stderr)
122 if stderr:
123 warn(stderr)
124 if PY3:
125 stdout = str(stdout, sys.stdout.encoding)
126 return stdout.strip()
128 def which(program):
129 """Same as UNIX which command. Return None on command not found."""
130 def is_exe(fpath):
131 return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
133 fpath, fname = os.path.split(program)
134 if fpath:
135 if is_exe(program):
136 return program
137 else:
138 for path in os.environ["PATH"].split(os.pathsep):
139 exe_file = os.path.join(path, program)
140 if is_exe(exe_file):
141 return exe_file
142 return None
144 def wait_for_pid(pid, timeout=1):
145 """Wait for pid to show up in the process list then return.
146 Used in the test suite to give time the sub process to initialize.
148 raise_at = time.time() + timeout
149 while 1:
150 if pid in psutil.get_pid_list():
151 # give it one more iteration to allow full initialization
152 time.sleep(0.01)
153 return
154 time.sleep(0.0001)
155 if time.time() >= raise_at:
156 raise RuntimeError("Timed out")
158 def reap_children(search_all=False):
159 """Kill any subprocess started by this test suite and ensure that
160 no zombies stick around to hog resources and create problems when
161 looking for refleaks.
163 pids = _subprocesses_started
164 if search_all:
165 this_process = psutil.Process(os.getpid())
166 for p in this_process.get_children(recursive=True):
167 pids.add(p.pid)
168 while pids:
169 pid = pids.pop()
170 try:
171 child = psutil.Process(pid)
172 child.kill()
173 except psutil.NoSuchProcess:
174 pass
175 except psutil.AccessDenied:
176 warn("couldn't kill child process with pid %s" % pid)
177 else:
178 child.wait(timeout=3)
180 def check_ip_address(addr, family):
181 """Attempts to check IP address's validity."""
182 if not addr:
183 return
184 ip, port = addr
185 assert isinstance(port, int), port
186 if family == socket.AF_INET:
187 ip = list(map(int, ip.split('.')))
188 assert len(ip) == 4, ip
189 for num in ip:
190 assert 0 <= num <= 255, ip
191 assert 0 <= port <= 65535, port
193 def safe_remove(fname):
194 """Deletes a file and does not exception if it doesn't exist."""
195 try:
196 os.remove(fname)
197 except OSError:
198 err = sys.exc_info()[1]
199 if err.args[0] != errno.ENOENT:
200 raise
202 def call_until(fun, expr, timeout=1):
203 """Keep calling function for timeout secs and exit if eval()
204 expression is True.
206 stop_at = time.time() + timeout
207 while time.time() < stop_at:
208 ret = fun()
209 if eval(expr):
210 return ret
211 time.sleep(0.001)
212 raise RuntimeError('timed out (ret=%r)' % ret)
214 def retry_before_failing(ntimes=None):
215 """Decorator which runs a test function and retries N times before
216 actually failing.
218 def decorator(fun):
219 @wraps(fun)
220 def wrapper(*args, **kwargs):
221 for x in range(ntimes or NO_RETRIES):
222 try:
223 return fun(*args, **kwargs)
224 except AssertionError:
225 err = sys.exc_info()[1]
226 raise
227 return wrapper
228 return decorator
230 def skip_on_access_denied(only_if=None):
231 """Decorator to Ignore AccessDenied exceptions."""
232 def decorator(fun):
233 @wraps(fun)
234 def wrapper(*args, **kwargs):
235 try:
236 return fun(*args, **kwargs)
237 except psutil.AccessDenied:
238 if only_if is not None:
239 if not only_if:
240 raise
241 msg = "%r was skipped because it raised AccessDenied" \
242 % fun.__name__
243 self = args[0]
244 if hasattr(self, 'skip'): # python >= 2.7
245 self.skip(msg)
246 else:
247 register_warning(msg)
248 return wrapper
249 return decorator
251 def skip_on_not_implemented(only_if=None):
252 """Decorator to Ignore NotImplementedError exceptions."""
253 def decorator(fun):
254 @wraps(fun)
255 def wrapper(*args, **kwargs):
256 try:
257 return fun(*args, **kwargs)
258 except NotImplementedError:
259 if only_if is not None:
260 if not only_if:
261 raise
262 msg = "%r was skipped because it raised NotImplementedError" \
263 % fun.__name__
264 self = args[0]
265 if hasattr(self, 'skip'): # python >= 2.7
266 self.skip(msg)
267 else:
268 register_warning(msg)
269 return wrapper
270 return decorator
272 def supports_ipv6():
273 """Return True if IPv6 is supported on this platform."""
274 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
275 return False
276 sock = None
277 try:
278 try:
279 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
280 sock.bind(("::1", 0))
281 except (socket.error, socket.gaierror):
282 return False
283 else:
284 return True
285 finally:
286 if sock is not None:
287 sock.close()
290 class ThreadTask(threading.Thread):
291 """A thread object used for running process thread tests."""
293 def __init__(self):
294 threading.Thread.__init__(self)
295 self._running = False
296 self._interval = None
297 self._flag = threading.Event()
299 def __repr__(self):
300 name = self.__class__.__name__
301 return '<%s running=%s at %#x>' % (name, self._running, id(self))
303 def start(self, interval=0.001):
304 """Start thread and keep it running until an explicit
305 stop() request. Polls for shutdown every 'timeout' seconds.
307 if self._running:
308 raise ValueError("already started")
309 self._interval = interval
310 threading.Thread.start(self)
311 self._flag.wait()
313 def run(self):
314 self._running = True
315 self._flag.set()
316 while self._running:
317 time.sleep(self._interval)
319 def stop(self):
320 """Stop thread execution and and waits until it is stopped."""
321 if not self._running:
322 raise ValueError("already stopped")
323 self._running = False
324 self.join()
327 # ===================================================================
328 # --- Support for python < 2.7 in case unittest2 is not installed
329 # ===================================================================
331 if not hasattr(unittest, 'skip'):
332 register_warning("unittest2 module is not installed; a serie of pretty " \
333 "darn ugly workarounds will be used")
335 class SkipTest(Exception):
336 pass
338 class TestCase(unittest.TestCase):
340 def _safe_repr(self, obj):
341 MAX_LENGTH = 80
342 try:
343 result = repr(obj)
344 except Exception:
345 result = object.__repr__(obj)
346 if len(result) < MAX_LENGTH:
347 return result
348 return result[:MAX_LENGTH] + ' [truncated]...'
350 def _fail_w_msg(self, a, b, middle, msg):
351 self.fail(msg or '%s %s %s' % (self._safe_repr(a), middle,
352 self._safe_repr(b)))
354 def skip(self, msg):
355 raise SkipTest(msg)
357 def assertIn(self, a, b, msg=None):
358 if a not in b:
359 self._fail_w_msg(a, b, 'not found in', msg)
361 def assertNotIn(self, a, b, msg=None):
362 if a in b:
363 self._fail_w_msg(a, b, 'found in', msg)
365 def assertGreater(self, a, b, msg=None):
366 if not a > b:
367 self._fail_w_msg(a, b, 'not greater than', msg)
369 def assertGreaterEqual(self, a, b, msg=None):
370 if not a >= b:
371 self._fail_w_msg(a, b, 'not greater than or equal to', msg)
373 def assertLess(self, a, b, msg=None):
374 if not a < b:
375 self._fail_w_msg(a, b, 'not less than', msg)
377 def assertLessEqual(self, a, b, msg=None):
378 if not a <= b:
379 self._fail_w_msg(a, b, 'not less or equal to', msg)
381 def assertIsInstance(self, a, b, msg=None):
382 if not isinstance(a, b):
383 self.fail(msg or '%s is not an instance of %r' \
384 % (self._safe_repr(a), b))
386 def assertAlmostEqual(self, a, b, msg=None, delta=None):
387 if delta is not None:
388 if abs(a - b) <= delta:
389 return
390 self.fail(msg or '%s != %s within %s delta' \
391 % (self._safe_repr(a), self._safe_repr(b),
392 self._safe_repr(delta)))
393 else:
394 self.assertEqual(a, b, msg=msg)
397 def skipIf(condition, reason):
398 def decorator(fun):
399 @wraps(fun)
400 def wrapper(*args, **kwargs):
401 self = args[0]
402 if condition:
403 sys.stdout.write("skipped-")
404 sys.stdout.flush()
405 if warn:
406 objname = "%s.%s" % (self.__class__.__name__,
407 fun.__name__)
408 msg = "%s was skipped" % objname
409 if reason:
410 msg += "; reason: " + repr(reason)
411 register_warning(msg)
412 return
413 else:
414 return fun(*args, **kwargs)
415 return wrapper
416 return decorator
418 def skipUnless(condition, reason):
419 if not condition:
420 return unittest.skipIf(True, reason)
421 return unittest.skipIf(False, reason)
423 unittest.TestCase = TestCase
424 unittest.skipIf = skipIf
425 unittest.skipUnless = skipUnless
426 del TestCase, skipIf, skipUnless
429 # ===================================================================
430 # --- System-related API tests
431 # ===================================================================
433 class TestSystemAPIs(unittest.TestCase):
434 """Tests for system-related APIs."""
436 def setUp(self):
437 safe_remove(TESTFN)
439 def tearDown(self):
440 reap_children()
442 def test_process_iter(self):
443 self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
444 sproc = get_test_subprocess()
445 self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
446 p = psutil.Process(sproc.pid)
447 p.kill()
448 p.wait()
449 self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
451 def test_TOTAL_PHYMEM(self):
452 x = psutil.TOTAL_PHYMEM
453 self.assertIsInstance(x, (int, long))
454 self.assertGreater(x, 0)
455 self.assertEqual(x, psutil.virtual_memory().total)
457 def test_BOOT_TIME(self, arg=None):
458 x = arg or psutil.BOOT_TIME
459 self.assertIsInstance(x, float)
460 self.assertGreater(x, 0)
461 self.assertLess(x, time.time())
463 def test_get_boot_time(self):
464 self.test_BOOT_TIME(psutil.get_boot_time())
465 if WINDOWS:
466 # work around float precision issues; give it 1 secs tolerance
467 diff = abs(psutil.get_boot_time() - psutil.BOOT_TIME)
468 self.assertLess(diff, 1)
469 else:
470 self.assertEqual(psutil.get_boot_time(), psutil.BOOT_TIME)
472 def test_NUM_CPUS(self):
473 self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True)))
474 self.assertGreaterEqual(psutil.NUM_CPUS, 1)
476 @unittest.skipUnless(POSIX, 'posix only')
477 def test_PAGESIZE(self):
478 # pagesize is used internally to perform different calculations
479 # and it's determined by using SC_PAGE_SIZE; make sure
480 # getpagesize() returns the same value.
481 import resource
482 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
484 def test_deprecated_apis(self):
485 s = socket.socket()
486 s.bind(('localhost', 0))
487 s.listen(1)
488 warnings.filterwarnings("error")
489 p = psutil.Process(os.getpid())
490 try:
491 self.assertRaises(DeprecationWarning, psutil.virtmem_usage)
492 self.assertRaises(DeprecationWarning, psutil.used_phymem)
493 self.assertRaises(DeprecationWarning, psutil.avail_phymem)
494 self.assertRaises(DeprecationWarning, psutil.total_virtmem)
495 self.assertRaises(DeprecationWarning, psutil.used_virtmem)
496 self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
497 self.assertRaises(DeprecationWarning, psutil.phymem_usage)
498 self.assertRaises(DeprecationWarning, psutil.get_process_list)
499 self.assertRaises(DeprecationWarning, psutil.network_io_counters)
500 if LINUX:
501 self.assertRaises(DeprecationWarning, psutil.phymem_buffers)
502 self.assertRaises(DeprecationWarning, psutil.cached_phymem)
503 try:
504 p.nice
505 except DeprecationWarning:
506 pass
507 else:
508 self.fail("p.nice didn't raise DeprecationWarning")
509 ret = call_until(p.get_connections, "len(ret) != 0", timeout=1)
510 self.assertRaises(DeprecationWarning,
511 getattr, ret[0], 'local_address')
512 self.assertRaises(DeprecationWarning,
513 getattr, ret[0], 'remote_address')
514 finally:
515 s.close()
516 warnings.resetwarnings()
518 def test_deprecated_apis_retval(self):
519 warnings.filterwarnings("ignore")
520 p = psutil.Process(os.getpid())
521 try:
522 self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total)
523 self.assertEqual(p.nice, p.get_nice())
524 finally:
525 warnings.resetwarnings()
527 def test_virtual_memory(self):
528 mem = psutil.virtual_memory()
529 assert mem.total > 0, mem
530 assert mem.available > 0, mem
531 assert 0 <= mem.percent <= 100, mem
532 assert mem.used > 0, mem
533 assert mem.free >= 0, mem
534 for name in mem._fields:
535 if name != 'total':
536 value = getattr(mem, name)
537 if not value >= 0:
538 self.fail("%r < 0 (%s)" % (name, value))
539 if value > mem.total:
540 self.fail("%r > total (total=%s, %s=%s)" \
541 % (name, mem.total, name, value))
543 def test_swap_memory(self):
544 mem = psutil.swap_memory()
545 assert mem.total >= 0, mem
546 assert mem.used >= 0, mem
547 assert mem.free > 0, mem
548 assert 0 <= mem.percent <= 100, mem
549 assert mem.sin >= 0, mem
550 assert mem.sout >= 0, mem
552 def test_pid_exists(self):
553 sproc = get_test_subprocess(wait=True)
554 assert psutil.pid_exists(sproc.pid)
555 p = psutil.Process(sproc.pid)
556 p.kill()
557 p.wait()
558 self.assertFalse(psutil.pid_exists(sproc.pid))
559 self.assertFalse(psutil.pid_exists(-1))
561 def test_pid_exists_2(self):
562 reap_children()
563 pids = psutil.get_pid_list()
564 for pid in pids:
565 try:
566 assert psutil.pid_exists(pid)
567 except AssertionError:
568 # in case the process disappeared in meantime fail only
569 # if it is no longer in get_pid_list()
570 time.sleep(.1)
571 if pid in psutil.get_pid_list():
572 self.fail(pid)
573 pids = range(max(pids) + 5000, max(pids) + 6000)
574 for pid in pids:
575 self.assertFalse(psutil.pid_exists(pid))
577 def test_get_pid_list(self):
578 plist = [x.pid for x in psutil.process_iter()]
579 pidlist = psutil.get_pid_list()
580 self.assertEqual(plist.sort(), pidlist.sort())
581 # make sure every pid is unique
582 self.assertEqual(len(pidlist), len(set(pidlist)))
584 def test_test(self):
585 # test for psutil.test() function
586 stdout = sys.stdout
587 sys.stdout = DEVNULL
588 try:
589 psutil.test()
590 finally:
591 sys.stdout = stdout
593 def test_sys_cpu_times(self):
594 total = 0
595 times = psutil.cpu_times()
596 sum(times)
597 for cp_time in times:
598 self.assertIsInstance(cp_time, float)
599 self.assertGreaterEqual(cp_time, 0.0)
600 total += cp_time
601 self.assertEqual(total, sum(times))
602 str(times)
604 def test_sys_cpu_times2(self):
605 t1 = sum(psutil.cpu_times())
606 time.sleep(0.1)
607 t2 = sum(psutil.cpu_times())
608 difference = t2 - t1
609 if not difference >= 0.05:
610 self.fail("difference %s" % difference)
612 def test_sys_per_cpu_times(self):
613 for times in psutil.cpu_times(percpu=True):
614 total = 0
615 sum(times)
616 for cp_time in times:
617 self.assertIsInstance(cp_time, float)
618 self.assertGreaterEqual(cp_time, 0.0)
619 total += cp_time
620 self.assertEqual(total, sum(times))
621 str(times)
622 self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
623 len(psutil.cpu_times(percpu=False)))
625 def test_sys_per_cpu_times2(self):
626 tot1 = psutil.cpu_times(percpu=True)
627 stop_at = time.time() + 0.1
628 while 1:
629 if time.time() >= stop_at:
630 break
631 tot2 = psutil.cpu_times(percpu=True)
632 for t1, t2 in zip(tot1, tot2):
633 t1, t2 = sum(t1), sum(t2)
634 difference = t2 - t1
635 if difference >= 0.05:
636 return
637 self.fail()
639 def _test_cpu_percent(self, percent):
640 self.assertIsInstance(percent, float)
641 self.assertGreaterEqual(percent, 0.0)
642 self.assertLessEqual(percent, 100.0)
644 def test_sys_cpu_percent(self):
645 psutil.cpu_percent(interval=0.001)
646 for x in range(1000):
647 self._test_cpu_percent(psutil.cpu_percent(interval=None))
649 def test_sys_per_cpu_percent(self):
650 self.assertEqual(len(psutil.cpu_percent(interval=0.001, percpu=True)),
651 psutil.NUM_CPUS)
652 for x in range(1000):
653 percents = psutil.cpu_percent(interval=None, percpu=True)
654 for percent in percents:
655 self._test_cpu_percent(percent)
657 def test_sys_cpu_times_percent(self):
658 psutil.cpu_times_percent(interval=0.001)
659 for x in range(1000):
660 cpu = psutil.cpu_times_percent(interval=None)
661 for percent in cpu:
662 self._test_cpu_percent(percent)
663 self._test_cpu_percent(sum(cpu))
665 def test_sys_per_cpu_times_percent(self):
666 self.assertEqual(len(psutil.cpu_times_percent(interval=0.001,
667 percpu=True)),
668 psutil.NUM_CPUS)
669 for x in range(1000):
670 cpus = psutil.cpu_times_percent(interval=None, percpu=True)
671 for cpu in cpus:
672 for percent in cpu:
673 self._test_cpu_percent(percent)
674 self._test_cpu_percent(sum(cpu))
676 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
677 "os.statvfs() function not available on this platform")
678 def test_disk_usage(self):
679 usage = psutil.disk_usage(os.getcwd())
680 assert usage.total > 0, usage
681 assert usage.used > 0, usage
682 assert usage.free > 0, usage
683 assert usage.total > usage.used, usage
684 assert usage.total > usage.free, usage
685 assert 0 <= usage.percent <= 100, usage.percent
687 # if path does not exist OSError ENOENT is expected across
688 # all platforms
689 fname = tempfile.mktemp()
690 try:
691 psutil.disk_usage(fname)
692 except OSError:
693 err = sys.exc_info()[1]
694 if err.args[0] != errno.ENOENT:
695 raise
696 else:
697 self.fail("OSError not raised")
699 @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
700 "os.statvfs() function not available on this platform")
701 def test_disk_partitions(self):
702 # all = False
703 for disk in psutil.disk_partitions(all=False):
704 if WINDOWS and 'cdrom' in disk.opts:
705 continue
706 if not POSIX:
707 assert os.path.exists(disk.device), disk
708 else:
709 # we cannot make any assumption about this, see:
710 # http://goo.gl/p9c43
711 disk.device
712 if SUNOS:
713 # on solaris apparently mount points can also be files
714 assert os.path.exists(disk.mountpoint), disk
715 else:
716 assert os.path.isdir(disk.mountpoint), disk
717 assert disk.fstype, disk
718 self.assertIsInstance(disk.opts, str)
720 # all = True
721 for disk in psutil.disk_partitions(all=True):
722 if not WINDOWS:
723 try:
724 os.stat(disk.mountpoint)
725 except OSError:
726 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
727 err = sys.exc_info()[1]
728 if err.errno not in (errno.EPERM, errno.EACCES):
729 raise
730 else:
731 if SUNOS:
732 # on solaris apparently mount points can also be files
733 assert os.path.exists(disk.mountpoint), disk
734 else:
735 assert os.path.isdir(disk.mountpoint), disk
736 self.assertIsInstance(disk.fstype, str)
737 self.assertIsInstance(disk.opts, str)
739 def find_mount_point(path):
740 path = os.path.abspath(path)
741 while not os.path.ismount(path):
742 path = os.path.dirname(path)
743 return path
745 mount = find_mount_point(__file__)
746 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
747 self.assertIn(mount, mounts)
748 psutil.disk_usage(mount)
750 def test_net_io_counters(self):
751 def check_ntuple(nt):
752 self.assertEqual(nt[0], nt.bytes_sent)
753 self.assertEqual(nt[1], nt.bytes_recv)
754 self.assertEqual(nt[2], nt.packets_sent)
755 self.assertEqual(nt[3], nt.packets_recv)
756 self.assertEqual(nt[4], nt.errin)
757 self.assertEqual(nt[5], nt.errout)
758 self.assertEqual(nt[6], nt.dropin)
759 self.assertEqual(nt[7], nt.dropout)
760 assert nt.bytes_sent >= 0, nt
761 assert nt.bytes_recv >= 0, nt
762 assert nt.packets_sent >= 0, nt
763 assert nt.packets_recv >= 0, nt
764 assert nt.errin >= 0, nt
765 assert nt.errout >= 0, nt
766 assert nt.dropin >= 0, nt
767 assert nt.dropout >= 0, nt
769 ret = psutil.net_io_counters(pernic=False)
770 check_ntuple(ret)
771 ret = psutil.net_io_counters(pernic=True)
772 assert ret != []
773 for key in ret:
774 assert key
775 check_ntuple(ret[key])
777 def test_disk_io_counters(self):
778 def check_ntuple(nt):
779 self.assertEqual(nt[0], nt.read_count)
780 self.assertEqual(nt[1], nt.write_count)
781 self.assertEqual(nt[2], nt.read_bytes)
782 self.assertEqual(nt[3], nt.write_bytes)
783 self.assertEqual(nt[4], nt.read_time)
784 self.assertEqual(nt[5], nt.write_time)
785 assert nt.read_count >= 0, nt
786 assert nt.write_count >= 0, nt
787 assert nt.read_bytes >= 0, nt
788 assert nt.write_bytes >= 0, nt
789 assert nt.read_time >= 0, nt
790 assert nt.write_time >= 0, nt
792 ret = psutil.disk_io_counters(perdisk=False)
793 check_ntuple(ret)
794 ret = psutil.disk_io_counters(perdisk=True)
795 # make sure there are no duplicates
796 self.assertEqual(len(ret), len(set(ret)))
797 for key in ret:
798 assert key, key
799 check_ntuple(ret[key])
800 if LINUX and key[-1].isdigit():
801 # if 'sda1' is listed 'sda' shouldn't, see:
802 # http://code.google.com/p/psutil/issues/detail?id=338
803 while key[-1].isdigit():
804 key = key[:-1]
805 self.assertNotIn(key, ret.keys())
807 def test_get_users(self):
808 users = psutil.get_users()
809 assert users
810 for user in users:
811 assert user.name, user
812 user.terminal
813 user.host
814 assert user.started > 0.0, user
815 datetime.datetime.fromtimestamp(user.started)
818 # ===================================================================
819 # --- psutil.Process class tests
820 # ===================================================================
822 class TestProcess(unittest.TestCase):
823 """Tests for psutil.Process class."""
825 def setUp(self):
826 safe_remove(TESTFN)
828 def tearDown(self):
829 reap_children()
831 def test_kill(self):
832 sproc = get_test_subprocess(wait=True)
833 test_pid = sproc.pid
834 p = psutil.Process(test_pid)
835 name = p.name
836 p.kill()
837 p.wait()
838 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
840 def test_terminate(self):
841 sproc = get_test_subprocess(wait=True)
842 test_pid = sproc.pid
843 p = psutil.Process(test_pid)
844 name = p.name
845 p.terminate()
846 p.wait()
847 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
849 def test_send_signal(self):
850 if POSIX:
851 sig = signal.SIGKILL
852 else:
853 sig = signal.SIGTERM
854 sproc = get_test_subprocess()
855 test_pid = sproc.pid
856 p = psutil.Process(test_pid)
857 name = p.name
858 p.send_signal(sig)
859 p.wait()
860 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
862 def test_wait(self):
863 # check exit code signal
864 sproc = get_test_subprocess()
865 p = psutil.Process(sproc.pid)
866 p.kill()
867 code = p.wait()
868 if os.name == 'posix':
869 self.assertEqual(code, signal.SIGKILL)
870 else:
871 self.assertEqual(code, 0)
872 self.assertFalse(p.is_running())
874 sproc = get_test_subprocess()
875 p = psutil.Process(sproc.pid)
876 p.terminate()
877 code = p.wait()
878 if os.name == 'posix':
879 self.assertEqual(code, signal.SIGTERM)
880 else:
881 self.assertEqual(code, 0)
882 self.assertFalse(p.is_running())
884 # check sys.exit() code
885 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
886 sproc = get_test_subprocess([PYTHON, "-c", code])
887 p = psutil.Process(sproc.pid)
888 self.assertEqual(p.wait(), 5)
889 self.assertFalse(p.is_running())
891 # Test wait() issued twice.
892 # It is not supposed to raise NSP when the process is gone.
893 # On UNIX this should return None, on Windows it should keep
894 # returning the exit code.
895 sproc = get_test_subprocess([PYTHON, "-c", code])
896 p = psutil.Process(sproc.pid)
897 self.assertEqual(p.wait(), 5)
898 self.assertIn(p.wait(), (5, None))
900 # test timeout
901 sproc = get_test_subprocess()
902 p = psutil.Process(sproc.pid)
903 p.name
904 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
906 # timeout < 0 not allowed
907 self.assertRaises(ValueError, p.wait, -1)
909 @unittest.skipUnless(POSIX, '') # XXX why is this skipped on Windows?
910 def test_wait_non_children(self):
911 # test wait() against processes which are not our children
912 code = "import sys;"
913 code += "from subprocess import Popen, PIPE;"
914 code += "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON
915 code += "sp = Popen(cmd, stdout=PIPE);"
916 code += "sys.stdout.write(str(sp.pid));"
917 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
919 grandson_pid = int(sproc.stdout.read())
920 grandson_proc = psutil.Process(grandson_pid)
921 try:
922 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
923 grandson_proc.kill()
924 ret = grandson_proc.wait()
925 self.assertEqual(ret, None)
926 finally:
927 if grandson_proc.is_running():
928 grandson_proc.kill()
929 grandson_proc.wait()
931 def test_wait_timeout_0(self):
932 sproc = get_test_subprocess()
933 p = psutil.Process(sproc.pid)
934 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
935 p.kill()
936 stop_at = time.time() + 2
937 while 1:
938 try:
939 code = p.wait(0)
940 except psutil.TimeoutExpired:
941 if time.time() >= stop_at:
942 raise
943 else:
944 break
945 if os.name == 'posix':
946 self.assertEqual(code, signal.SIGKILL)
947 else:
948 self.assertEqual(code, 0)
949 self.assertFalse(p.is_running())
951 def test_cpu_percent(self):
952 p = psutil.Process(os.getpid())
953 p.get_cpu_percent(interval=0.001)
954 p.get_cpu_percent(interval=0.001)
955 for x in range(100):
956 percent = p.get_cpu_percent(interval=None)
957 self.assertIsInstance(percent, float)
958 self.assertGreaterEqual(percent, 0.0)
959 if os.name != 'posix':
960 self.assertLessEqual(percent, 100.0)
961 else:
962 self.assertGreaterEqual(percent, 0.0)
964 def test_cpu_times(self):
965 times = psutil.Process(os.getpid()).get_cpu_times()
966 assert (times.user > 0.0) or (times.system > 0.0), times
967 # make sure returned values can be pretty printed with strftime
968 time.strftime("%H:%M:%S", time.localtime(times.user))
969 time.strftime("%H:%M:%S", time.localtime(times.system))
971 # Test Process.cpu_times() against os.times()
972 # os.times() is broken on Python 2.6
973 # http://bugs.python.org/issue1040026
974 # XXX fails on OSX: not sure if it's for os.times(). We should
975 # try this with Python 2.7 and re-enable the test.
977 @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX,
978 'os.times() is not reliable on this Python version')
979 def test_cpu_times2(self):
980 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
981 utime, ktime = os.times()[:2]
983 # Use os.times()[:2] as base values to compare our results
984 # using a tolerance of +/- 0.1 seconds.
985 # It will fail if the difference between the values is > 0.1s.
986 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
987 self.fail("expected: %s, found: %s" %(utime, user_time))
989 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
990 self.fail("expected: %s, found: %s" %(ktime, kernel_time))
992 def test_create_time(self):
993 sproc = get_test_subprocess(wait=True)
994 now = time.time()
995 p = psutil.Process(sproc.pid)
996 create_time = p.create_time
998 # Use time.time() as base value to compare our result using a
999 # tolerance of +/- 1 second.
1000 # It will fail if the difference between the values is > 2s.
1001 difference = abs(create_time - now)
1002 if difference > 2:
1003 self.fail("expected: %s, found: %s, difference: %s"
1004 % (now, create_time, difference))
1006 # make sure returned value can be pretty printed with strftime
1007 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
1009 @unittest.skipIf(WINDOWS, 'windows only')
1010 def test_terminal(self):
1011 terminal = psutil.Process(os.getpid()).terminal
1012 if sys.stdin.isatty():
1013 self.assertEqual(terminal, sh('tty'))
1014 else:
1015 assert terminal, repr(terminal)
1017 @unittest.skipIf(not hasattr(psutil.Process, 'get_io_counters'),
1018 'not available on this platform')
1019 @skip_on_not_implemented(only_if=LINUX)
1020 def test_get_io_counters(self):
1021 p = psutil.Process(os.getpid())
1022 # test reads
1023 io1 = p.get_io_counters()
1024 f = open(PYTHON, 'rb')
1025 f.read()
1026 f.close()
1027 io2 = p.get_io_counters()
1028 if not BSD:
1029 assert io2.read_count > io1.read_count, (io1, io2)
1030 self.assertEqual(io2.write_count, io1.write_count)
1031 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
1032 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
1033 # test writes
1034 io1 = p.get_io_counters()
1035 f = tempfile.TemporaryFile()
1036 if PY3:
1037 f.write(bytes("x" * 1000000, 'ascii'))
1038 else:
1039 f.write("x" * 1000000)
1040 f.close()
1041 io2 = p.get_io_counters()
1042 assert io2.write_count >= io1.write_count, (io1, io2)
1043 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
1044 assert io2.read_count >= io1.read_count, (io1, io2)
1045 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
1047 # Linux and Windows Vista+
1048 @unittest.skipUnless(hasattr(psutil.Process, 'get_ionice'),
1049 'Linux and Windows Vista only')
1050 def test_get_set_ionice(self):
1051 if LINUX:
1052 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
1053 IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
1054 self.assertEqual(IOPRIO_CLASS_NONE, 0)
1055 self.assertEqual(IOPRIO_CLASS_RT, 1)
1056 self.assertEqual(IOPRIO_CLASS_BE, 2)
1057 self.assertEqual(IOPRIO_CLASS_IDLE, 3)
1058 p = psutil.Process(os.getpid())
1059 try:
1060 p.set_ionice(2)
1061 ioclass, value = p.get_ionice()
1062 self.assertEqual(ioclass, 2)
1063 self.assertEqual(value, 4)
1065 p.set_ionice(3)
1066 ioclass, value = p.get_ionice()
1067 self.assertEqual(ioclass, 3)
1068 self.assertEqual(value, 0)
1070 p.set_ionice(2, 0)
1071 ioclass, value = p.get_ionice()
1072 self.assertEqual(ioclass, 2)
1073 self.assertEqual(value, 0)
1074 p.set_ionice(2, 7)
1075 ioclass, value = p.get_ionice()
1076 self.assertEqual(ioclass, 2)
1077 self.assertEqual(value, 7)
1078 self.assertRaises(ValueError, p.set_ionice, 2, 10)
1079 finally:
1080 p.set_ionice(IOPRIO_CLASS_NONE)
1081 else:
1082 p = psutil.Process(os.getpid())
1083 original = p.get_ionice()
1084 try:
1085 value = 0 # very low
1086 if original == value:
1087 value = 1 # low
1088 p.set_ionice(value)
1089 self.assertEqual(p.get_ionice(), value)
1090 finally:
1091 p.set_ionice(original)
1093 self.assertRaises(ValueError, p.set_ionice, 3)
1094 self.assertRaises(TypeError, p.set_ionice, 2, 1)
1096 def test_get_num_threads(self):
1097 # on certain platforms such as Linux we might test for exact
1098 # thread number, since we always have with 1 thread per process,
1099 # but this does not apply across all platforms (OSX, Windows)
1100 p = psutil.Process(os.getpid())
1101 step1 = p.get_num_threads()
1103 thread = ThreadTask()
1104 thread.start()
1105 try:
1106 step2 = p.get_num_threads()
1107 self.assertEqual(step2, step1 + 1)
1108 thread.stop()
1109 finally:
1110 if thread._running:
1111 thread.stop()
1113 @unittest.skipUnless(WINDOWS, 'Windows only')
1114 def test_get_num_handles(self):
1115 # a better test is done later into test/_windows.py
1116 p = psutil.Process(os.getpid())
1117 self.assertGreater(p.get_num_handles(), 0)
1119 def test_get_threads(self):
1120 p = psutil.Process(os.getpid())
1121 step1 = p.get_threads()
1123 thread = ThreadTask()
1124 thread.start()
1126 try:
1127 step2 = p.get_threads()
1128 self.assertEqual(len(step2), len(step1) + 1)
1129 # on Linux, first thread id is supposed to be this process
1130 if LINUX:
1131 self.assertEqual(step2[0].id, os.getpid())
1132 athread = step2[0]
1133 # test named tuple
1134 self.assertEqual(athread.id, athread[0])
1135 self.assertEqual(athread.user_time, athread[1])
1136 self.assertEqual(athread.system_time, athread[2])
1137 # test num threads
1138 thread.stop()
1139 finally:
1140 if thread._running:
1141 thread.stop()
1143 def test_get_memory_info(self):
1144 p = psutil.Process(os.getpid())
1146 # step 1 - get a base value to compare our results
1147 rss1, vms1 = p.get_memory_info()
1148 percent1 = p.get_memory_percent()
1149 self.assertGreater(rss1, 0)
1150 self.assertGreater(vms1, 0)
1152 # step 2 - allocate some memory
1153 memarr = [None] * 1500000
1155 rss2, vms2 = p.get_memory_info()
1156 percent2 = p.get_memory_percent()
1157 # make sure that the memory usage bumped up
1158 self.assertGreater(rss2, rss1)
1159 self.assertGreaterEqual(vms2, vms1) # vms might be equal
1160 self.assertGreater(percent2, percent1)
1161 del memarr
1163 # def test_get_ext_memory_info(self):
1164 # # tested later in fetch all test suite
1166 def test_get_memory_maps(self):
1167 p = psutil.Process(os.getpid())
1168 maps = p.get_memory_maps()
1169 paths = [x for x in maps]
1170 self.assertEqual(len(paths), len(set(paths)))
1171 ext_maps = p.get_memory_maps(grouped=False)
1173 for nt in maps:
1174 if not nt.path.startswith('['):
1175 assert os.path.isabs(nt.path), nt.path
1176 if POSIX:
1177 assert os.path.exists(nt.path), nt.path
1178 else:
1179 # XXX - On Windows we have this strange behavior with
1180 # 64 bit dlls: they are visible via explorer but cannot
1181 # be accessed via os.stat() (wtf?).
1182 if '64' not in os.path.basename(nt.path):
1183 assert os.path.exists(nt.path), nt.path
1184 for nt in ext_maps:
1185 for fname in nt._fields:
1186 value = getattr(nt, fname)
1187 if fname == 'path':
1188 continue
1189 elif fname in ('addr', 'perms'):
1190 assert value, value
1191 else:
1192 self.assertIsInstance(value, (int, long))
1193 assert value >= 0, value
1195 def test_get_memory_percent(self):
1196 p = psutil.Process(os.getpid())
1197 self.assertGreater(p.get_memory_percent(), 0.0)
1199 def test_pid(self):
1200 sproc = get_test_subprocess()
1201 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
1203 def test_is_running(self):
1204 sproc = get_test_subprocess(wait=True)
1205 p = psutil.Process(sproc.pid)
1206 assert p.is_running()
1207 assert p.is_running()
1208 p.kill()
1209 p.wait()
1210 assert not p.is_running()
1211 assert not p.is_running()
1213 def test_exe(self):
1214 sproc = get_test_subprocess(wait=True)
1215 exe = psutil.Process(sproc.pid).exe
1216 try:
1217 self.assertEqual(exe, PYTHON)
1218 except AssertionError:
1219 if WINDOWS and len(exe) == len(PYTHON):
1220 # on Windows we don't care about case sensitivity
1221 self.assertEqual(exe.lower(), PYTHON.lower())
1222 else:
1223 # certain platforms such as BSD are more accurate returning:
1224 # "/usr/local/bin/python2.7"
1225 # ...instead of:
1226 # "/usr/local/bin/python"
1227 # We do not want to consider this difference in accuracy
1228 # an error.
1229 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
1230 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
1232 def test_cmdline(self):
1233 cmdline = [PYTHON, "-c", "import time; time.sleep(2)"]
1234 sproc = get_test_subprocess(cmdline, wait=True)
1235 self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline),
1236 ' '.join(cmdline))
1238 def test_name(self):
1239 sproc = get_test_subprocess(PYTHON, wait=True)
1240 name = psutil.Process(sproc.pid).name.lower()
1241 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
1242 assert pyexe.startswith(name), (pyexe, name)
1244 @unittest.skipUnless(POSIX, 'posix only')
1245 def test_uids(self):
1246 p = psutil.Process(os.getpid())
1247 real, effective, saved = p.uids
1248 # os.getuid() refers to "real" uid
1249 self.assertEqual(real, os.getuid())
1250 # os.geteuid() refers to "effective" uid
1251 self.assertEqual(effective, os.geteuid())
1252 # no such thing as os.getsuid() ("saved" uid), but starting
1253 # from python 2.7 we have os.getresuid()[2]
1254 if hasattr(os, "getresuid"):
1255 self.assertEqual(saved, os.getresuid()[2])
1257 @unittest.skipUnless(POSIX, 'posix only')
1258 def test_gids(self):
1259 p = psutil.Process(os.getpid())
1260 real, effective, saved = p.gids
1261 # os.getuid() refers to "real" uid
1262 self.assertEqual(real, os.getgid())
1263 # os.geteuid() refers to "effective" uid
1264 self.assertEqual(effective, os.getegid())
1265 # no such thing as os.getsuid() ("saved" uid), but starting
1266 # from python 2.7 we have os.getresgid()[2]
1267 if hasattr(os, "getresuid"):
1268 self.assertEqual(saved, os.getresgid()[2])
1270 def test_nice(self):
1271 p = psutil.Process(os.getpid())
1272 self.assertRaises(TypeError, p.set_nice, "str")
1273 if os.name == 'nt':
1274 try:
1275 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
1276 p.set_nice(psutil.HIGH_PRIORITY_CLASS)
1277 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS)
1278 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1279 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
1280 finally:
1281 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1282 else:
1283 try:
1284 try:
1285 first_nice = p.get_nice()
1286 p.set_nice(1)
1287 self.assertEqual(p.get_nice(), 1)
1288 # going back to previous nice value raises AccessDenied on OSX
1289 if not OSX:
1290 p.set_nice(0)
1291 self.assertEqual(p.get_nice(), 0)
1292 except psutil.AccessDenied:
1293 pass
1294 finally:
1295 try:
1296 p.set_nice(first_nice)
1297 except psutil.AccessDenied:
1298 pass
1300 def test_status(self):
1301 p = psutil.Process(os.getpid())
1302 self.assertEqual(p.status, psutil.STATUS_RUNNING)
1303 self.assertEqual(str(p.status), "running")
1305 def test_status_constants(self):
1306 # STATUS_* constants are supposed to be comparable also by
1307 # using their str representation
1308 self.assertTrue(psutil.STATUS_RUNNING == 0)
1309 self.assertTrue(psutil.STATUS_RUNNING == long(0))
1310 self.assertTrue(psutil.STATUS_RUNNING == 'running')
1311 self.assertFalse(psutil.STATUS_RUNNING == 1)
1312 self.assertFalse(psutil.STATUS_RUNNING == 'sleeping')
1313 self.assertFalse(psutil.STATUS_RUNNING != 0)
1314 self.assertFalse(psutil.STATUS_RUNNING != 'running')
1315 self.assertTrue(psutil.STATUS_RUNNING != 1)
1316 self.assertTrue(psutil.STATUS_RUNNING != 'sleeping')
1318 def test_username(self):
1319 sproc = get_test_subprocess()
1320 p = psutil.Process(sproc.pid)
1321 if POSIX:
1322 import pwd
1323 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
1324 elif WINDOWS and 'USERNAME' in os.environ:
1325 expected_username = os.environ['USERNAME']
1326 expected_domain = os.environ['USERDOMAIN']
1327 domain, username = p.username.split('\\')
1328 self.assertEqual(domain, expected_domain)
1329 self.assertEqual(username, expected_username)
1330 else:
1331 p.username
1333 @unittest.skipUnless(hasattr(psutil.Process, "getcwd"),
1334 'not available on this platform')
1335 def test_getcwd(self):
1336 sproc = get_test_subprocess(wait=True)
1337 p = psutil.Process(sproc.pid)
1338 self.assertEqual(p.getcwd(), os.getcwd())
1340 @unittest.skipIf(not hasattr(psutil.Process, "getcwd"),
1341 'not available on this platform')
1342 def test_getcwd_2(self):
1343 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(2)"]
1344 sproc = get_test_subprocess(cmd, wait=True)
1345 p = psutil.Process(sproc.pid)
1346 call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1)
1348 @unittest.skipIf(not hasattr(psutil.Process, "get_cpu_affinity"),
1349 'not available on this platform')
1350 def test_cpu_affinity(self):
1351 p = psutil.Process(os.getpid())
1352 initial = p.get_cpu_affinity()
1353 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
1355 for n in all_cpus:
1356 p.set_cpu_affinity([n])
1357 self.assertEqual(p.get_cpu_affinity(), [n])
1359 p.set_cpu_affinity(all_cpus)
1360 self.assertEqual(p.get_cpu_affinity(), all_cpus)
1362 p.set_cpu_affinity(initial)
1363 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
1364 self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu)
1366 def test_get_open_files(self):
1367 # current process
1368 p = psutil.Process(os.getpid())
1369 files = p.get_open_files()
1370 self.assertFalse(TESTFN in files)
1371 f = open(TESTFN, 'w')
1372 call_until(p.get_open_files, "len(ret) != %i" % len(files))
1373 filenames = [x.path for x in p.get_open_files()]
1374 self.assertIn(TESTFN, filenames)
1375 f.close()
1376 for file in filenames:
1377 assert os.path.isfile(file), file
1379 # another process
1380 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN
1381 sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
1382 p = psutil.Process(sproc.pid)
1383 for x in range(100):
1384 filenames = [x.path for x in p.get_open_files()]
1385 if TESTFN in filenames:
1386 break
1387 time.sleep(.01)
1388 else:
1389 self.assertIn(TESTFN, filenames)
1390 for file in filenames:
1391 assert os.path.isfile(file), file
1393 def test_get_open_files2(self):
1394 # test fd and path fields
1395 fileobj = open(TESTFN, 'w')
1396 p = psutil.Process(os.getpid())
1397 for path, fd in p.get_open_files():
1398 if path == fileobj.name or fd == fileobj.fileno():
1399 break
1400 else:
1401 self.fail("no file found; files=%s" % repr(p.get_open_files()))
1402 self.assertEqual(path, fileobj.name)
1403 if WINDOWS:
1404 self.assertEqual(fd, -1)
1405 else:
1406 self.assertEqual(fd, fileobj.fileno())
1407 # test positions
1408 ntuple = p.get_open_files()[0]
1409 self.assertEqual(ntuple[0], ntuple.path)
1410 self.assertEqual(ntuple[1], ntuple.fd)
1411 # test file is gone
1412 fileobj.close()
1413 self.assertTrue(fileobj.name not in p.get_open_files())
1415 def test_connection_constants(self):
1416 ints = []
1417 strs = []
1418 for name in dir(psutil):
1419 if name.startswith('CONN_'):
1420 num = getattr(psutil, name)
1421 str_ = str(num)
1422 assert str_.isupper(), str_
1423 assert str_ not in strs, str_
1424 assert num not in ints, num
1425 ints.append(num)
1426 strs.append(str_)
1427 if SUNOS:
1428 psutil.CONN_IDLE
1429 psutil.CONN_BOUND
1430 if WINDOWS:
1431 psutil.CONN_DELETE_TCB
1433 def test_get_connections(self):
1434 arg = "import socket, time;" \
1435 "s = socket.socket();" \
1436 "s.bind(('127.0.0.1', 0));" \
1437 "s.listen(1);" \
1438 "conn, addr = s.accept();" \
1439 "time.sleep(2);"
1440 sproc = get_test_subprocess([PYTHON, "-c", arg])
1441 p = psutil.Process(sproc.pid)
1442 for x in range(100):
1443 if p.get_connections():
1444 # give the subprocess some more time to bind()
1445 time.sleep(.01)
1446 cons = p.get_connections()
1447 break
1448 time.sleep(.01)
1449 self.assertEqual(len(cons), 1)
1450 con = cons[0]
1451 self.assertEqual(con.family, socket.AF_INET)
1452 self.assertEqual(con.type, socket.SOCK_STREAM)
1453 self.assertEqual(con.status, psutil.CONN_LISTEN, str(con.status))
1454 ip, port = con.laddr
1455 self.assertEqual(ip, '127.0.0.1')
1456 self.assertEqual(con.raddr, ())
1457 if WINDOWS or SUNOS:
1458 self.assertEqual(con.fd, -1)
1459 else:
1460 assert con.fd > 0, con
1461 # test positions
1462 self.assertEqual(con[0], con.fd)
1463 self.assertEqual(con[1], con.family)
1464 self.assertEqual(con[2], con.type)
1465 self.assertEqual(con[3], con.laddr)
1466 self.assertEqual(con[4], con.raddr)
1467 self.assertEqual(con[5], con.status)
1468 # test kind arg
1469 self.assertRaises(ValueError, p.get_connections, 'foo')
1471 @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported')
1472 def test_get_connections_ipv6(self):
1473 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1474 s.bind(('::1', 0))
1475 s.listen(1)
1476 cons = psutil.Process(os.getpid()).get_connections()
1477 s.close()
1478 self.assertEqual(len(cons), 1)
1479 self.assertEqual(cons[0].laddr[0], '::1')
1481 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'AF_UNIX is not supported')
1482 def test_get_connections_unix(self):
1483 # tcp
1484 safe_remove(TESTFN)
1485 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1486 sock.bind(TESTFN)
1487 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
1488 if conn.fd != -1: # != sunos and windows
1489 self.assertEqual(conn.fd, sock.fileno())
1490 self.assertEqual(conn.family, socket.AF_UNIX)
1491 self.assertEqual(conn.type, socket.SOCK_STREAM)
1492 self.assertEqual(conn.laddr, TESTFN)
1493 self.assertTrue(not conn.raddr)
1494 self.assertEqual(conn.status, psutil.CONN_NONE, str(conn.status))
1495 sock.close()
1496 # udp
1497 safe_remove(TESTFN)
1498 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
1499 sock.bind(TESTFN)
1500 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
1501 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1502 sock.close()
1504 @unittest.skipUnless(hasattr(socket, "fromfd"),
1505 'socket.fromfd() is not availble')
1506 @unittest.skipIf(WINDOWS or SUNOS,
1507 'connection fd available on this platform')
1508 def test_connection_fromfd(self):
1509 sock = socket.socket()
1510 sock.bind(('localhost', 0))
1511 sock.listen(1)
1512 p = psutil.Process(os.getpid())
1513 for conn in p.get_connections():
1514 if conn.fd == sock.fileno():
1515 break
1516 else:
1517 sock.close()
1518 self.fail("couldn't find socket fd")
1519 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
1520 try:
1521 self.assertEqual(dupsock.getsockname(), conn.laddr)
1522 self.assertNotEqual(sock.fileno(), dupsock.fileno())
1523 finally:
1524 sock.close()
1525 dupsock.close()
1527 def test_get_connections_all(self):
1528 tcp_template = "import socket;" \
1529 "s = socket.socket($family, socket.SOCK_STREAM);" \
1530 "s.bind(('$addr', 0));" \
1531 "s.listen(1);" \
1532 "conn, addr = s.accept();"
1534 udp_template = "import socket, time;" \
1535 "s = socket.socket($family, socket.SOCK_DGRAM);" \
1536 "s.bind(('$addr', 0));" \
1537 "time.sleep(2);"
1539 from string import Template
1540 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
1541 addr="127.0.0.1")
1542 udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
1543 addr="127.0.0.1")
1544 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
1545 addr="::1")
1546 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
1547 addr="::1")
1549 # launch various subprocess instantiating a socket of various
1550 # families and types to enrich psutil results
1551 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
1552 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
1553 if supports_ipv6():
1554 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
1555 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
1556 else:
1557 tcp6_proc = None
1558 udp6_proc = None
1560 # check matches against subprocesses just created
1561 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
1562 "udp", "udp4", "udp6")
1563 for p in psutil.Process(os.getpid()).get_children():
1564 for conn in p.get_connections():
1565 # TCP v4
1566 if p.pid == tcp4_proc.pid:
1567 self.assertEqual(conn.family, socket.AF_INET)
1568 self.assertEqual(conn.type, socket.SOCK_STREAM)
1569 self.assertEqual(conn.laddr[0], "127.0.0.1")
1570 self.assertEqual(conn.raddr, ())
1571 self.assertEqual(conn.status, psutil.CONN_LISTEN,
1572 str(conn.status))
1573 for kind in all_kinds:
1574 cons = p.get_connections(kind=kind)
1575 if kind in ("all", "inet", "inet4", "tcp", "tcp4"):
1576 assert cons != [], cons
1577 else:
1578 self.assertEqual(cons, [], cons)
1579 # UDP v4
1580 elif p.pid == udp4_proc.pid:
1581 self.assertEqual(conn.family, socket.AF_INET)
1582 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1583 self.assertEqual(conn.laddr[0], "127.0.0.1")
1584 self.assertEqual(conn.raddr, ())
1585 self.assertEqual(conn.status, psutil.CONN_NONE,
1586 str(conn.status))
1587 for kind in all_kinds:
1588 cons = p.get_connections(kind=kind)
1589 if kind in ("all", "inet", "inet4", "udp", "udp4"):
1590 assert cons != [], cons
1591 else:
1592 self.assertEqual(cons, [], cons)
1593 # TCP v6
1594 elif p.pid == getattr(tcp6_proc, "pid", None):
1595 self.assertEqual(conn.family, socket.AF_INET6)
1596 self.assertEqual(conn.type, socket.SOCK_STREAM)
1597 self.assertIn(conn.laddr[0], ("::", "::1"))
1598 self.assertEqual(conn.raddr, ())
1599 self.assertEqual(conn.status, psutil.CONN_LISTEN,
1600 str(conn.status))
1601 for kind in all_kinds:
1602 cons = p.get_connections(kind=kind)
1603 if kind in ("all", "inet", "inet6", "tcp", "tcp6"):
1604 assert cons != [], cons
1605 else:
1606 self.assertEqual(cons, [], cons)
1607 # UDP v6
1608 elif p.pid == getattr(udp6_proc, "pid", None):
1609 self.assertEqual(conn.family, socket.AF_INET6)
1610 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1611 self.assertIn(conn.laddr[0], ("::", "::1"))
1612 self.assertEqual(conn.raddr, ())
1613 self.assertEqual(conn.status, psutil.CONN_NONE,
1614 str(conn.status))
1615 for kind in all_kinds:
1616 cons = p.get_connections(kind=kind)
1617 if kind in ("all", "inet", "inet6", "udp", "udp6"):
1618 assert cons != [], cons
1619 else:
1620 self.assertEqual(cons, [], cons)
1622 @unittest.skipUnless(POSIX, 'posix only')
1623 def test_get_num_fds(self):
1624 p = psutil.Process(os.getpid())
1625 start = p.get_num_fds()
1626 file = open(TESTFN, 'w')
1627 self.assertEqual(p.get_num_fds(), start + 1)
1628 sock = socket.socket()
1629 self.assertEqual(p.get_num_fds(), start + 2)
1630 file.close()
1631 sock.close()
1632 self.assertEqual(p.get_num_fds(), start)
1634 @skip_on_not_implemented(only_if=LINUX)
1635 def test_get_num_ctx_switches(self):
1636 p = psutil.Process(os.getpid())
1637 before = sum(p.get_num_ctx_switches())
1638 for x in range(500000):
1639 after = sum(p.get_num_ctx_switches())
1640 if after > before:
1641 return
1642 self.fail("num ctx switches still the same after 50.000 iterations")
1644 def test_parent_ppid(self):
1645 this_parent = os.getpid()
1646 sproc = get_test_subprocess()
1647 p = psutil.Process(sproc.pid)
1648 self.assertEqual(p.ppid, this_parent)
1649 self.assertEqual(p.parent.pid, this_parent)
1650 # no other process is supposed to have us as parent
1651 for p in psutil.process_iter():
1652 if p.pid == sproc.pid:
1653 continue
1654 self.assertTrue(p.ppid != this_parent)
1656 def test_get_children(self):
1657 p = psutil.Process(os.getpid())
1658 self.assertEqual(p.get_children(), [])
1659 self.assertEqual(p.get_children(recursive=True), [])
1660 sproc = get_test_subprocess()
1661 children1 = p.get_children()
1662 children2 = p.get_children(recursive=True)
1663 for children in (children1, children2):
1664 self.assertEqual(len(children), 1)
1665 self.assertEqual(children[0].pid, sproc.pid)
1666 self.assertEqual(children[0].ppid, os.getpid())
1668 def test_get_children_recursive(self):
1669 # here we create a subprocess which creates another one as in:
1670 # A (parent) -> B (child) -> C (grandchild)
1671 s = "import subprocess, os, sys, time;"
1672 s += "PYTHON = os.path.realpath(sys.executable);"
1673 s += "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];"
1674 s += "subprocess.Popen(cmd);"
1675 s += "time.sleep(2);"
1676 get_test_subprocess(cmd=[PYTHON, "-c", s])
1677 p = psutil.Process(os.getpid())
1678 self.assertEqual(len(p.get_children(recursive=False)), 1)
1679 # give the grandchild some time to start
1680 stop_at = time.time() + 1.5
1681 while time.time() < stop_at:
1682 children = p.get_children(recursive=True)
1683 if len(children) > 1:
1684 break
1685 self.assertEqual(len(children), 2)
1686 self.assertEqual(children[0].ppid, os.getpid())
1687 self.assertEqual(children[1].ppid, children[0].pid)
1689 def test_get_children_duplicates(self):
1690 # find the process which has the highest number of children
1691 from psutil._compat import defaultdict
1692 table = defaultdict(int)
1693 for p in psutil.process_iter():
1694 try:
1695 table[p.ppid] += 1
1696 except psutil.Error:
1697 pass
1698 # this is the one, now let's make sure there are no duplicates
1699 pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
1700 p = psutil.Process(pid)
1701 try:
1702 c = p.get_children(recursive=True)
1703 except psutil.AccessDenied: # windows
1704 pass
1705 else:
1706 self.assertEqual(len(c), len(set(c)))
1708 def test_suspend_resume(self):
1709 sproc = get_test_subprocess(wait=True)
1710 p = psutil.Process(sproc.pid)
1711 p.suspend()
1712 for x in range(100):
1713 if p.status == psutil.STATUS_STOPPED:
1714 break
1715 time.sleep(0.01)
1716 self.assertEqual(str(p.status), "stopped")
1717 p.resume()
1718 assert p.status != psutil.STATUS_STOPPED, p.status
1720 def test_invalid_pid(self):
1721 self.assertRaises(TypeError, psutil.Process, "1")
1722 self.assertRaises(TypeError, psutil.Process, None)
1723 self.assertRaises(ValueError, psutil.Process, -1)
1725 def test_as_dict(self):
1726 sproc = get_test_subprocess()
1727 p = psutil.Process(sproc.pid)
1728 d = p.as_dict()
1729 try:
1730 import json
1731 except ImportError:
1732 pass
1733 else:
1734 # dict is supposed to be hashable
1735 json.dumps(d)
1737 d = p.as_dict(attrs=['exe', 'name'])
1738 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1740 p = psutil.Process(min(psutil.get_pid_list()))
1741 d = p.as_dict(attrs=['get_connections'], ad_value='foo')
1742 if not isinstance(d['connections'], list):
1743 self.assertEqual(d['connections'], 'foo')
1745 def test_zombie_process(self):
1746 # Test that NoSuchProcess exception gets raised in case the
1747 # process dies after we create the Process object.
1748 # Example:
1749 # >>> proc = Process(1234)
1750 # >>> time.sleep(2) # time-consuming task, process dies in meantime
1751 # >>> proc.name
1752 # Refers to Issue #15
1753 sproc = get_test_subprocess()
1754 p = psutil.Process(sproc.pid)
1755 p.kill()
1756 p.wait()
1758 for name in dir(p):
1759 if name.startswith('_')\
1760 or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
1761 'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
1762 'nice'):
1763 continue
1764 try:
1765 meth = getattr(p, name)
1766 if callable(meth):
1767 meth()
1768 except psutil.NoSuchProcess:
1769 pass
1770 except NotImplementedError:
1771 pass
1772 else:
1773 self.fail("NoSuchProcess exception not raised for %r" % name)
1775 # other methods
1776 try:
1777 if os.name == 'posix':
1778 p.set_nice(1)
1779 else:
1780 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1781 except psutil.NoSuchProcess:
1782 pass
1783 else:
1784 self.fail("exception not raised")
1785 if hasattr(p, 'set_ionice'):
1786 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
1787 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
1788 self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0)
1789 self.assertFalse(p.is_running())
1790 if hasattr(p, "set_cpu_affinity"):
1791 self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0])
1793 def test__str__(self):
1794 sproc = get_test_subprocess()
1795 p = psutil.Process(sproc.pid)
1796 self.assertIn(str(sproc.pid), str(p))
1797 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1798 if not OSX:
1799 self.assertIn(os.path.basename(PYTHON), str(p))
1800 sproc = get_test_subprocess()
1801 p = psutil.Process(sproc.pid)
1802 p.kill()
1803 p.wait()
1804 self.assertIn(str(sproc.pid), str(p))
1805 self.assertIn("terminated", str(p))
1807 @unittest.skipIf(LINUX, 'PID 0 not available on Linux')
1808 def test_pid_0(self):
1809 # Process(0) is supposed to work on all platforms except Linux
1810 p = psutil.Process(0)
1811 self.assertTrue(p.name)
1813 if os.name == 'posix':
1814 try:
1815 self.assertEqual(p.uids.real, 0)
1816 self.assertEqual(p.gids.real, 0)
1817 except psutil.AccessDenied:
1818 pass
1820 self.assertIn(p.ppid, (0, 1))
1821 #self.assertEqual(p.exe, "")
1822 p.cmdline
1823 try:
1824 p.get_num_threads()
1825 except psutil.AccessDenied:
1826 pass
1828 try:
1829 p.get_memory_info()
1830 except psutil.AccessDenied:
1831 pass
1833 # username property
1834 try:
1835 if POSIX:
1836 self.assertEqual(p.username, 'root')
1837 elif WINDOWS:
1838 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
1839 else:
1840 p.username
1841 except psutil.AccessDenied:
1842 pass
1844 self.assertIn(0, psutil.get_pid_list())
1845 self.assertTrue(psutil.pid_exists(0))
1847 def test__all__(self):
1848 for name in dir(psutil):
1849 if name in ('callable', 'defaultdict', 'error', 'namedtuple',
1850 'test'):
1851 continue
1852 if not name.startswith('_'):
1853 try:
1854 __import__(name)
1855 except ImportError:
1856 if name not in psutil.__all__:
1857 fun = getattr(psutil, name)
1858 if fun is None:
1859 continue
1860 if 'deprecated' not in fun.__doc__.lower():
1861 self.fail('%r not in psutil.__all__' % name)
1863 def test_Popen(self):
1864 # Popen class test
1865 # XXX this test causes a ResourceWarning on Python 3 because
1866 # psutil.__subproc instance doesn't get propertly freed.
1867 # Not sure what to do though.
1868 cmd = [PYTHON, "-c", "import time; time.sleep(2);"]
1869 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1870 try:
1871 proc.name
1872 proc.stdin
1873 self.assertTrue(hasattr(proc, 'name'))
1874 self.assertTrue(hasattr(proc, 'stdin'))
1875 self.assertRaises(AttributeError, getattr, proc, 'foo')
1876 finally:
1877 proc.kill()
1878 proc.wait()
1881 # ===================================================================
1882 # --- Featch all processes test
1883 # ===================================================================
1885 class TestFetchAllProcesses(unittest.TestCase):
1886 # Iterates over all running processes and performs some sanity
1887 # checks against Process API's returned values.
1889 def setUp(self):
1890 if POSIX:
1891 import pwd
1892 pall = pwd.getpwall()
1893 self._uids = set([x.pw_uid for x in pall])
1894 self._usernames = set([x.pw_name for x in pall])
1896 def test_fetch_all(self):
1897 valid_procs = 0
1898 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
1899 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
1900 'parent', 'get_children', 'pid']
1901 attrs = []
1902 for name in dir(psutil.Process):
1903 if name.startswith("_"):
1904 continue
1905 if name.startswith("set_"):
1906 continue
1907 if name in excluded_names:
1908 continue
1909 attrs.append(name)
1911 default = object()
1912 failures = []
1913 for name in attrs:
1914 for p in psutil.process_iter():
1915 ret = default
1916 try:
1917 try:
1918 attr = getattr(p, name, None)
1919 if attr is not None and callable(attr):
1920 ret = attr()
1921 else:
1922 ret = attr
1923 valid_procs += 1
1924 except NotImplementedError:
1925 register_warning("%r was skipped because not "
1926 "implemented" % (self.__class__.__name__ + \
1927 '.test_' + name))
1928 except (psutil.NoSuchProcess, psutil.AccessDenied):
1929 err = sys.exc_info()[1]
1930 if isinstance(err, psutil.NoSuchProcess):
1931 if psutil.pid_exists(p.pid):
1932 # XXX race condition; we probably need
1933 # to try figuring out the process
1934 # identity before failing
1935 self.fail("PID still exists but fun raised " \
1936 "NoSuchProcess")
1937 self.assertEqual(err.pid, p.pid)
1938 if err.name:
1939 # make sure exception's name attr is set
1940 # with the actual process name
1941 self.assertEqual(err.name, p.name)
1942 self.assertTrue(str(err))
1943 self.assertTrue(err.msg)
1944 else:
1945 if ret not in (0, 0.0, [], None, ''):
1946 assert ret, ret
1947 meth = getattr(self, name)
1948 meth(ret)
1949 except Exception:
1950 err = sys.exc_info()[1]
1951 s = '\n' + '=' * 70 + '\n'
1952 s += "FAIL: test_%s (proc=%s" % (name, p)
1953 if ret != default:
1954 s += ", ret=%s)" % repr(ret)
1955 s += ')\n'
1956 s += '-' * 70
1957 s += "\n%s" % traceback.format_exc()
1958 s = "\n".join((" " * 4) + i for i in s.splitlines())
1959 failures.append(s)
1960 break
1962 if failures:
1963 self.fail(''.join(failures))
1965 # we should always have a non-empty list, not including PID 0 etc.
1966 # special cases.
1967 self.assertTrue(valid_procs > 0)
1969 def cmdline(self, ret):
1970 pass
1972 def exe(self, ret):
1973 if not ret:
1974 self.assertEqual(ret, '')
1975 else:
1976 assert os.path.isabs(ret), ret
1977 # Note: os.stat() may return False even if the file is there
1978 # hence we skip the test, see:
1979 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
1980 if POSIX:
1981 assert os.path.isfile(ret), ret
1982 if hasattr(os, 'access') and hasattr(os, "X_OK"):
1983 # XXX may fail on OSX
1984 self.assertTrue(os.access(ret, os.X_OK))
1986 def ppid(self, ret):
1987 self.assertTrue(ret >= 0)
1989 def name(self, ret):
1990 self.assertTrue(isinstance(ret, str))
1991 self.assertTrue(ret)
1993 def create_time(self, ret):
1994 self.assertTrue(ret > 0)
1995 # this can't be taken for granted on all platforms
1996 #self.assertGreaterEqual(ret, psutil.BOOT_TIME)
1997 # make sure returned value can be pretty printed
1998 # with strftime
1999 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
2001 def uids(self, ret):
2002 for uid in ret:
2003 self.assertTrue(uid >= 0)
2004 self.assertIn(uid, self._uids)
2006 def gids(self, ret):
2007 # note: testing all gids as above seems not to be reliable for
2008 # gid == 30 (nodoby); not sure why.
2009 for gid in ret:
2010 self.assertTrue(gid >= 0)
2011 #self.assertIn(uid, self.gids)
2013 def username(self, ret):
2014 self.assertTrue(ret)
2015 if os.name == 'posix':
2016 self.assertIn(ret, self._usernames)
2018 def status(self, ret):
2019 self.assertTrue(ret >= 0)
2020 self.assertTrue(str(ret) != '?')
2022 def get_io_counters(self, ret):
2023 for field in ret:
2024 if field != -1:
2025 self.assertTrue(field >= 0)
2027 def get_ionice(self, ret):
2028 if LINUX:
2029 self.assertTrue(ret.ioclass >= 0)
2030 self.assertTrue(ret.value >= 0)
2031 else:
2032 self.assertTrue(ret >= 0)
2033 self.assertIn(ret, (0, 1, 2))
2035 def get_num_threads(self, ret):
2036 self.assertTrue(ret >= 1)
2038 def get_threads(self, ret):
2039 for t in ret:
2040 self.assertTrue(t.id >= 0)
2041 self.assertTrue(t.user_time >= 0)
2042 self.assertTrue(t.system_time >= 0)
2044 def get_cpu_times(self, ret):
2045 self.assertTrue(ret.user >= 0)
2046 self.assertTrue(ret.system >= 0)
2048 def get_memory_info(self, ret):
2049 self.assertTrue(ret.rss >= 0)
2050 self.assertTrue(ret.vms >= 0)
2052 def get_ext_memory_info(self, ret):
2053 for name in ret._fields:
2054 self.assertTrue(getattr(ret, name) >= 0)
2055 if POSIX and ret.vms != 0:
2056 # VMS is always supposed to be the highest
2057 for name in ret._fields:
2058 if name != 'vms':
2059 value = getattr(ret, name)
2060 assert ret.vms > value, ret
2061 elif WINDOWS:
2062 assert ret.peak_wset >= ret.wset, ret
2063 assert ret.peak_paged_pool >= ret.paged_pool, ret
2064 assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
2065 assert ret.peak_pagefile >= ret.pagefile, ret
2067 def get_open_files(self, ret):
2068 for f in ret:
2069 if WINDOWS:
2070 assert f.fd == -1, f
2071 else:
2072 self.assertIsInstance(f.fd, int)
2073 assert os.path.isabs(f.path), f
2074 assert os.path.isfile(f.path), f
2076 def get_num_fds(self, ret):
2077 self.assertTrue(ret >= 0)
2079 def get_connections(self, ret):
2080 # all values are supposed to match Linux's tcp_states.h states
2081 # table across all platforms.
2082 valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
2083 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
2084 "LAST_ACK", "LISTEN", "CLOSING", "NONE"]
2085 if SUNOS:
2086 valid_conn_states += ["IDLE", "BOUND"]
2087 if WINDOWS:
2088 valid_conn_states += ["DELETE_TCB"]
2089 for conn in ret:
2090 self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM))
2091 self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6))
2092 check_ip_address(conn.laddr, conn.family)
2093 check_ip_address(conn.raddr, conn.family)
2094 if conn.status not in valid_conn_states:
2095 self.fail("%s is not a valid status" % conn.status)
2096 # actually try to bind the local socket; ignore IPv6
2097 # sockets as their address might be represented as
2098 # an IPv4-mapped-address (e.g. "::127.0.0.1")
2099 # and that's rejected by bind()
2100 if conn.family == socket.AF_INET:
2101 s = socket.socket(conn.family, conn.type)
2102 s.bind((conn.laddr[0], 0))
2103 s.close()
2105 if not WINDOWS and hasattr(socket, 'fromfd'):
2106 dupsock = None
2107 try:
2108 try:
2109 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
2110 except (socket.error, OSError):
2111 err = sys.exc_info()[1]
2112 if err.args[0] == errno.EBADF:
2113 continue
2114 raise
2115 # python >= 2.5
2116 if hasattr(dupsock, "family"):
2117 self.assertEqual(dupsock.family, conn.family)
2118 self.assertEqual(dupsock.type, conn.type)
2119 finally:
2120 if dupsock is not None:
2121 dupsock.close()
2123 def getcwd(self, ret):
2124 if ret is not None: # BSD may return None
2125 assert os.path.isabs(ret), ret
2126 try:
2127 st = os.stat(ret)
2128 except OSError:
2129 err = sys.exc_info()[1]
2130 # directory has been removed in mean time
2131 if err.errno != errno.ENOENT:
2132 raise
2133 else:
2134 self.assertTrue(stat.S_ISDIR(st.st_mode))
2136 def get_memory_percent(self, ret):
2137 assert 0 <= ret <= 100, ret
2139 def is_running(self, ret):
2140 self.assertTrue(ret)
2142 def get_cpu_affinity(self, ret):
2143 assert ret != [], ret
2145 def terminal(self, ret):
2146 if ret is not None:
2147 assert os.path.isabs(ret), ret
2148 assert os.path.exists(ret), ret
2150 def get_memory_maps(self, ret):
2151 for nt in ret:
2152 for fname in nt._fields:
2153 value = getattr(nt, fname)
2154 if fname == 'path':
2155 if not value.startswith('['):
2156 assert os.path.isabs(nt.path), nt.path
2157 # commented as on Linux we might get '/foo/bar (deleted)'
2158 #assert os.path.exists(nt.path), nt.path
2159 elif fname in ('addr', 'perms'):
2160 self.assertTrue(value)
2161 else:
2162 self.assertIsInstance(value, (int, long))
2163 assert value >= 0, value
2165 def get_num_handles(self, ret):
2166 if WINDOWS:
2167 self.assertGreaterEqual(ret, 0)
2168 else:
2169 self.assertGreaterEqual(ret, 0)
2171 def get_nice(self, ret):
2172 if POSIX:
2173 assert -20 <= ret <= 20, ret
2174 else:
2175 priorities = [getattr(psutil, x) for x in dir(psutil)
2176 if x.endswith('_PRIORITY_CLASS')]
2177 self.assertIn(ret, priorities)
2179 def get_num_ctx_switches(self, ret):
2180 self.assertTrue(ret.voluntary >= 0)
2181 self.assertTrue(ret.involuntary >= 0)
2184 # ===================================================================
2185 # --- Limited user tests
2186 # ===================================================================
2188 if hasattr(os, 'getuid') and os.getuid() == 0:
2189 class LimitedUserTestCase(TestProcess):
2190 """Repeat the previous tests by using a limited user.
2191 Executed only on UNIX and only if the user who run the test script
2192 is root.
2194 # the uid/gid the test suite runs under
2195 PROCESS_UID = os.getuid()
2196 PROCESS_GID = os.getgid()
2198 def __init__(self, *args, **kwargs):
2199 TestProcess.__init__(self, *args, **kwargs)
2200 # re-define all existent test methods in order to
2201 # ignore AccessDenied exceptions
2202 for attr in [x for x in dir(self) if x.startswith('test')]:
2203 meth = getattr(self, attr)
2204 def test_(self):
2205 try:
2206 meth()
2207 except psutil.AccessDenied:
2208 pass
2209 setattr(self, attr, types.MethodType(test_, self))
2211 def setUp(self):
2212 os.setegid(1000)
2213 os.seteuid(1000)
2214 TestProcess.setUp(self)
2216 def tearDown(self):
2217 os.setegid(self.PROCESS_UID)
2218 os.seteuid(self.PROCESS_GID)
2219 TestProcess.tearDown(self)
2221 def test_nice(self):
2222 try:
2223 psutil.Process(os.getpid()).set_nice(-1)
2224 except psutil.AccessDenied:
2225 pass
2226 else:
2227 self.fail("exception not raised")
2230 # ===================================================================
2231 # --- Example script tests
2232 # ===================================================================
2234 class TestExampleScripts(unittest.TestCase):
2235 """Tests for scripts in the examples directory."""
2237 def assert_stdout(self, exe, args=None):
2238 exe = os.path.join(EXAMPLES_DIR, exe)
2239 if args:
2240 exe = exe + ' ' + args
2241 try:
2242 out = sh(sys.executable + ' ' + exe).strip()
2243 except RuntimeError:
2244 err = sys.exc_info()[1]
2245 if 'AccessDenied' in str(err):
2246 return str(err)
2247 else:
2248 raise
2249 assert out, out
2250 return out
2252 def assert_syntax(self, exe, args=None):
2253 exe = os.path.join(EXAMPLES_DIR, exe)
2254 f = open(exe, 'r')
2255 try:
2256 src = f.read()
2257 finally:
2258 f.close()
2259 ast.parse(src)
2261 def test_check_presence(self):
2262 # make sure all example scripts have a test method defined
2263 meths = dir(self)
2264 for name in os.listdir(EXAMPLES_DIR):
2265 if name.endswith('.py'):
2266 if 'test_' + os.path.splitext(name)[0] not in meths:
2267 #self.assert_stdout(name)
2268 self.fail('no test defined for %r script' \
2269 % os.path.join(EXAMPLES_DIR, name))
2271 def test_disk_usage(self):
2272 self.assert_stdout('disk_usage.py')
2274 def test_free(self):
2275 self.assert_stdout('free.py')
2277 def test_meminfo(self):
2278 self.assert_stdout('meminfo.py')
2280 def test_process_detail(self):
2281 self.assert_stdout('process_detail.py')
2283 def test_who(self):
2284 self.assert_stdout('who.py')
2286 def test_netstat(self):
2287 self.assert_stdout('netstat.py')
2289 def test_pmap(self):
2290 self.assert_stdout('pmap.py', args=str(os.getpid()))
2292 @unittest.skipIf(ast is None,
2293 'ast module not available on this python version')
2294 def test_killall(self):
2295 self.assert_syntax('killall.py')
2297 @unittest.skipIf(ast is None,
2298 'ast module not available on this python version')
2299 def test_nettop(self):
2300 self.assert_syntax('nettop.py')
2302 @unittest.skipIf(ast is None,
2303 'ast module not available on this python version')
2304 def test_top(self):
2305 self.assert_syntax('top.py')
2307 @unittest.skipIf(ast is None,
2308 'ast module not available on this python version')
2309 def test_iotop(self):
2310 self.assert_syntax('iotop.py')
2313 def cleanup():
2314 reap_children(search_all=True)
2315 DEVNULL.close()
2316 safe_remove(TESTFN)
2318 atexit.register(cleanup)
2319 safe_remove(TESTFN)
2321 def test_main():
2322 tests = []
2323 test_suite = unittest.TestSuite()
2324 tests.append(TestSystemAPIs)
2325 tests.append(TestProcess)
2326 tests.append(TestFetchAllProcesses)
2328 if POSIX:
2329 from _posix import PosixSpecificTestCase
2330 tests.append(PosixSpecificTestCase)
2332 # import the specific platform test suite
2333 if LINUX:
2334 from _linux import LinuxSpecificTestCase as stc
2335 elif WINDOWS:
2336 from _windows import WindowsSpecificTestCase as stc
2337 from _windows import TestDualProcessImplementation
2338 tests.append(TestDualProcessImplementation)
2339 elif OSX:
2340 from _osx import OSXSpecificTestCase as stc
2341 elif BSD:
2342 from _bsd import BSDSpecificTestCase as stc
2343 elif SUNOS:
2344 from _sunos import SunOSSpecificTestCase as stc
2345 tests.append(stc)
2347 if hasattr(os, 'getuid'):
2348 if 'LimitedUserTestCase' in globals():
2349 tests.append(LimitedUserTestCase)
2350 else:
2351 register_warning("LimitedUserTestCase was skipped (super-user "
2352 "privileges are required)")
2354 tests.append(TestExampleScripts)
2356 for test_class in tests:
2357 test_suite.addTest(unittest.makeSuite(test_class))
2358 result = unittest.TextTestRunner(verbosity=2).run(test_suite)
2359 return result.wasSuccessful()
2361 if __name__ == '__main__':
2362 if not test_main():
2363 sys.exit(1)