Bug 856434: Bug 837035 didn't create the TransportFlow for DataChannels. Passes...
[gecko.git] / python / psutil / test / test_psutil.py
blobee6888a8e906362b4701670b349ed77801602b2c
1 #!/usr/bin/env python
3 # $Id: test_psutil.py 1525 2012-08-16 16:32:03Z g.rodola $
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
6 # Use of this source code is governed by a BSD-style license that can be
7 # found in the LICENSE file.
9 """
10 psutil test suite.
12 Note: this is targeted for both python 2.x and 3.x so there's no need
13 to use 2to3 tool first.
14 """
16 from __future__ import division
17 import unittest
18 import os
19 import sys
20 import subprocess
21 import time
22 import signal
23 import types
24 import traceback
25 import socket
26 import warnings
27 import atexit
28 import errno
29 import threading
30 import tempfile
31 import stat
32 import collections
33 import datetime
35 import psutil
36 from psutil._compat import PY3, callable, long
39 PYTHON = os.path.realpath(sys.executable)
40 DEVNULL = open(os.devnull, 'r+')
41 TESTFN = os.path.join(os.getcwd(), "$testfile")
42 POSIX = os.name == 'posix'
43 LINUX = sys.platform.startswith("linux")
44 WINDOWS = sys.platform.startswith("win32")
45 OSX = sys.platform.startswith("darwin")
46 BSD = sys.platform.startswith("freebsd")
49 _subprocesses_started = set()
51 def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, stdin=None,
52 wait=False):
53 """Return a subprocess.Popen object to use in tests.
54 By default stdout and stderr are redirected to /dev/null and the
55 python interpreter is used as test process.
56 If 'wait' is True attemps to make sure the process is in a
57 reasonably initialized state.
58 """
59 if cmd is None:
60 pyline = ""
61 if wait:
62 pyline += "open('%s', 'w'); " % TESTFN
63 pyline += "import time; time.sleep(3600);"
64 cmd_ = [PYTHON, "-c", pyline]
65 else:
66 cmd_ = cmd
67 sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
68 if wait:
69 if cmd is None:
70 stop_at = time.time() + 3
71 while time.time() > stop_at:
72 if os.path.exists(TESTFN):
73 break
74 time.sleep(0.001)
75 else:
76 warn("couldn't make sure test file was actually created")
77 else:
78 wait_for_pid(sproc.pid)
79 _subprocesses_started.add(sproc.pid)
80 return sproc
82 def warn(msg, warntype=RuntimeWarning):
83 """Raise a warning msg."""
84 warnings.warn(msg, warntype)
86 def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
87 """run cmd in a subprocess and return its output.
88 raises RuntimeError on error.
89 """
90 p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
91 stdout, stderr = p.communicate()
92 if p.returncode != 0:
93 raise RuntimeError(stderr)
94 if stderr:
95 warn(stderr)
96 if PY3:
97 stdout = str(stdout, sys.stdout.encoding)
98 return stdout.strip()
100 def which(program):
101 """Same as UNIX which command. Return None on command not found."""
102 def is_exe(fpath):
103 return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
105 fpath, fname = os.path.split(program)
106 if fpath:
107 if is_exe(program):
108 return program
109 else:
110 for path in os.environ["PATH"].split(os.pathsep):
111 exe_file = os.path.join(path, program)
112 if is_exe(exe_file):
113 return exe_file
114 return None
116 def wait_for_pid(pid, timeout=1):
117 """Wait for pid to show up in the process list then return.
118 Used in the test suite to give time the sub process to initialize.
120 raise_at = time.time() + timeout
121 while 1:
122 if pid in psutil.get_pid_list():
123 # give it one more iteration to allow full initialization
124 time.sleep(0.01)
125 return
126 time.sleep(0.0001)
127 if time.time() >= raise_at:
128 raise RuntimeError("Timed out")
130 def reap_children(search_all=False):
131 """Kill any subprocess started by this test suite and ensure that
132 no zombies stick around to hog resources and create problems when
133 looking for refleaks.
135 pids = _subprocesses_started
136 if search_all:
137 this_process = psutil.Process(os.getpid())
138 for p in this_process.get_children(recursive=True):
139 pids.add(p.pid)
140 while pids:
141 pid = pids.pop()
142 try:
143 child = psutil.Process(pid)
144 child.kill()
145 except psutil.NoSuchProcess:
146 pass
147 except psutil.AccessDenied:
148 warn("couldn't kill child process with pid %s" % pid)
149 else:
150 child.wait(timeout=3)
152 def check_ip_address(addr, family):
153 """Attempts to check IP address's validity."""
154 if not addr:
155 return
156 ip, port = addr
157 assert isinstance(port, int), port
158 if family == socket.AF_INET:
159 ip = list(map(int, ip.split('.')))
160 assert len(ip) == 4, ip
161 for num in ip:
162 assert 0 <= num <= 255, ip
163 assert 0 <= port <= 65535, port
165 def safe_remove(fname):
166 try:
167 os.remove(fname)
168 except OSError:
169 pass
171 def call_until(fun, expr, timeout=1):
172 """Keep calling function for timeout secs and exit if eval()
173 expression is True.
175 stop_at = time.time() + timeout
176 while time.time() < stop_at:
177 ret = fun()
178 if eval(expr):
179 return ret
180 time.sleep(0.001)
181 raise RuntimeError('timed out (ret=%r)' % ret)
184 def skipIf(condition, reason="", warn=False):
185 """Decorator which skip a test under if condition is satisfied.
186 This is a substitute of unittest.skipIf which is available
187 only in python 2.7 and 3.2.
188 If 'reason' argument is provided this will be printed during
189 tests execution.
190 If 'warn' is provided a RuntimeWarning will be shown when all
191 tests are run.
193 def outer(fun, *args, **kwargs):
194 def inner(self):
195 if condition:
196 sys.stdout.write("skipped-")
197 sys.stdout.flush()
198 if warn:
199 objname = "%s.%s" % (self.__class__.__name__, fun.__name__)
200 msg = "%s was skipped" % objname
201 if reason:
202 msg += "; reason: " + repr(reason)
203 atexit.register(warn, msg)
204 return
205 else:
206 return fun(self, *args, **kwargs)
207 return inner
208 return outer
210 def skipUnless(condition, reason="", warn=False):
211 """Contrary of skipIf."""
212 if not condition:
213 return skipIf(True, reason, warn)
214 return skipIf(False)
216 def ignore_access_denied(fun):
217 """Decorator to Ignore AccessDenied exceptions."""
218 def outer(fun, *args, **kwargs):
219 def inner(self):
220 try:
221 return fun(self, *args, **kwargs)
222 except psutil.AccessDenied:
223 pass
224 return inner
225 return outer
227 def supports_ipv6():
228 """Return True if IPv6 is supported on this platform."""
229 if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
230 return False
231 sock = None
232 try:
233 try:
234 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
235 sock.bind(("::1", 0))
236 except (socket.error, socket.gaierror):
237 return False
238 else:
239 return True
240 finally:
241 if sock is not None:
242 sock.close()
245 class ThreadTask(threading.Thread):
246 """A thread object used for running process thread tests."""
248 def __init__(self):
249 threading.Thread.__init__(self)
250 self._running = False
251 self._interval = None
252 self._flag = threading.Event()
254 def __repr__(self):
255 name = self.__class__.__name__
256 return '<%s running=%s at %#x>' % (name, self._running, id(self))
258 def start(self, interval=0.001):
259 """Start thread and keep it running until an explicit
260 stop() request. Polls for shutdown every 'timeout' seconds.
262 if self._running:
263 raise ValueError("already started")
264 self._interval = interval
265 threading.Thread.start(self)
266 self._flag.wait()
268 def run(self):
269 self._running = True
270 self._flag.set()
271 while self._running:
272 time.sleep(self._interval)
274 def stop(self):
275 """Stop thread execution and and waits until it is stopped."""
276 if not self._running:
277 raise ValueError("already stopped")
278 self._running = False
279 self.join()
282 class TestCase(unittest.TestCase):
284 def setUp(self):
285 safe_remove(TESTFN)
287 def tearDown(self):
288 reap_children()
290 if not hasattr(unittest.TestCase, "assertIn"):
291 def assertIn(self, member, container, msg=None):
292 if member not in container:
293 self.fail(msg or \
294 '%s not found in %s' % (repr(member), repr(container)))
296 def assert_eq_w_tol(self, first, second, tolerance):
297 difference = abs(first - second)
298 if difference <= tolerance:
299 return
300 msg = '%r != %r within %r delta (%r difference)' \
301 % (first, second, tolerance, difference)
302 raise AssertionError(msg)
304 # ============================
305 # tests for system-related API
306 # ============================
308 def test_process_iter(self):
309 assert os.getpid() in [x.pid for x in psutil.process_iter()]
310 sproc = get_test_subprocess()
311 assert sproc.pid in [x.pid for x in psutil.process_iter()]
312 p = psutil.Process(sproc.pid)
313 p.kill()
314 p.wait()
315 assert sproc.pid not in [x.pid for x in psutil.process_iter()]
317 def test_TOTAL_PHYMEM(self):
318 x = psutil.TOTAL_PHYMEM
319 assert isinstance(x, (int, long))
320 assert x > 0
321 self.assertEqual(x, psutil.virtual_memory().total)
323 def test_BOOT_TIME(self):
324 x = psutil.BOOT_TIME
325 assert isinstance(x, float)
326 assert x > 0
327 assert x < time.time(), x
329 def test_NUM_CPUS(self):
330 self.assertEqual(psutil.NUM_CPUS, len(psutil.cpu_times(percpu=True)))
331 assert psutil.NUM_CPUS >= 1, psutil.NUM_CPUS
333 @skipUnless(POSIX)
334 def test_PAGESIZE(self):
335 # pagesize is used internally to perform different calculations
336 # and it's determined by using SC_PAGE_SIZE; make sure
337 # getpagesize() returns the same value.
338 import resource
339 self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
341 def test_deprecated_apis(self):
342 warnings.filterwarnings("error")
343 p = psutil.Process(os.getpid())
344 try:
345 self.assertRaises(DeprecationWarning, psutil.virtmem_usage)
346 self.assertRaises(DeprecationWarning, psutil.used_phymem)
347 self.assertRaises(DeprecationWarning, psutil.avail_phymem)
348 self.assertRaises(DeprecationWarning, psutil.total_virtmem)
349 self.assertRaises(DeprecationWarning, psutil.used_virtmem)
350 self.assertRaises(DeprecationWarning, psutil.avail_virtmem)
351 self.assertRaises(DeprecationWarning, psutil.phymem_usage)
352 self.assertRaises(DeprecationWarning, psutil.get_process_list)
353 self.assertRaises(DeprecationWarning, psutil.get_process_list)
354 if LINUX:
355 self.assertRaises(DeprecationWarning, psutil.phymem_buffers)
356 self.assertRaises(DeprecationWarning, psutil.cached_phymem)
357 try:
358 p.nice
359 except DeprecationWarning:
360 pass
361 else:
362 self.fail("p.nice didn't raise DeprecationWarning")
363 finally:
364 warnings.resetwarnings()
366 def test_deprecated_apis_retval(self):
367 warnings.filterwarnings("ignore")
368 p = psutil.Process(os.getpid())
369 try:
370 self.assertEqual(psutil.total_virtmem(), psutil.swap_memory().total)
371 self.assertEqual(psutil.get_process_list(), list(psutil.process_iter()))
372 self.assertEqual(p.nice, p.get_nice())
373 finally:
374 warnings.resetwarnings()
376 def test_virtual_memory(self):
377 mem = psutil.virtual_memory()
378 assert mem.total > 0, mem
379 assert mem.available > 0, mem
380 assert 0 <= mem.percent <= 100, mem
381 assert mem.used > 0, mem
382 assert mem.free >= 0, mem
383 for name in mem._fields:
384 if name != 'total':
385 value = getattr(mem, name)
386 if not value >= 0:
387 self.fail("%r < 0 (%s)" % (name, value))
388 if value > mem.total:
389 self.fail("%r > total (total=%s, %s=%s)" \
390 % (name, mem.total, name, value))
392 def test_swap_memory(self):
393 mem = psutil.swap_memory()
394 assert mem.total > 0, mem
395 assert mem.used >= 0, mem
396 assert mem.free > 0, mem
397 assert 0 <= mem.percent <= 100, mem
398 assert mem.sin >= 0, mem
399 assert mem.sout >= 0, mem
401 def test_pid_exists(self):
402 sproc = get_test_subprocess(wait=True)
403 assert psutil.pid_exists(sproc.pid)
404 p = psutil.Process(sproc.pid)
405 p.kill()
406 p.wait()
407 self.assertFalse(psutil.pid_exists(sproc.pid))
408 self.assertFalse(psutil.pid_exists(-1))
410 def test_pid_exists_2(self):
411 reap_children()
412 pids = psutil.get_pid_list()
413 for pid in pids:
414 try:
415 assert psutil.pid_exists(pid)
416 except AssertionError:
417 # in case the process disappeared in meantime fail only
418 # if it is no longer in get_pid_list()
419 time.sleep(.1)
420 if pid in psutil.get_pid_list():
421 self.fail(pid)
422 pids = range(max(pids) + 5000, max(pids) + 6000)
423 for pid in pids:
424 self.assertFalse(psutil.pid_exists(pid))
426 def test_get_pid_list(self):
427 plist = [x.pid for x in psutil.process_iter()]
428 pidlist = psutil.get_pid_list()
429 self.assertEqual(plist.sort(), pidlist.sort())
430 # make sure every pid is unique
431 self.assertEqual(len(pidlist), len(set(pidlist)))
433 def test_test(self):
434 # test for psutil.test() function
435 stdout = sys.stdout
436 sys.stdout = DEVNULL
437 try:
438 psutil.test()
439 finally:
440 sys.stdout = stdout
442 def test_sys_cpu_times(self):
443 total = 0
444 times = psutil.cpu_times()
445 sum(times)
446 for cp_time in times:
447 assert isinstance(cp_time, float)
448 assert cp_time >= 0.0, cp_time
449 total += cp_time
450 self.assertEqual(total, sum(times))
451 str(times)
453 def test_sys_cpu_times2(self):
454 t1 = sum(psutil.cpu_times())
455 time.sleep(0.1)
456 t2 = sum(psutil.cpu_times())
457 difference = t2 - t1
458 if not difference >= 0.05:
459 self.fail("difference %s" % difference)
461 def test_sys_per_cpu_times(self):
462 for times in psutil.cpu_times(percpu=True):
463 total = 0
464 sum(times)
465 for cp_time in times:
466 assert isinstance(cp_time, float)
467 assert cp_time >= 0.0, cp_time
468 total += cp_time
469 self.assertEqual(total, sum(times))
470 str(times)
472 def test_sys_per_cpu_times2(self):
473 tot1 = psutil.cpu_times(percpu=True)
474 stop_at = time.time() + 0.1
475 while 1:
476 if time.time() >= stop_at:
477 break
478 tot2 = psutil.cpu_times(percpu=True)
479 for t1, t2 in zip(tot1, tot2):
480 t1, t2 = sum(t1), sum(t2)
481 difference = t2 - t1
482 if difference >= 0.05:
483 return
484 self.fail()
486 def test_sys_cpu_percent(self):
487 psutil.cpu_percent(interval=0.001)
488 psutil.cpu_percent(interval=0.001)
489 for x in range(1000):
490 percent = psutil.cpu_percent(interval=None)
491 assert isinstance(percent, float)
492 assert percent >= 0.0, percent
493 assert percent <= 100.0, percent
495 def test_sys_per_cpu_percent(self):
496 psutil.cpu_percent(interval=0.001, percpu=True)
497 psutil.cpu_percent(interval=0.001, percpu=True)
498 for x in range(1000):
499 percents = psutil.cpu_percent(interval=None, percpu=True)
500 for percent in percents:
501 assert isinstance(percent, float)
502 assert percent >= 0.0, percent
503 assert percent <= 100.0, percent
505 def test_disk_usage(self):
506 usage = psutil.disk_usage(os.getcwd())
507 assert usage.total > 0, usage
508 assert usage.used > 0, usage
509 assert usage.free > 0, usage
510 assert usage.total > usage.used, usage
511 assert usage.total > usage.free, usage
512 assert 0 <= usage.percent <= 100, usage.percent
514 # if path does not exist OSError ENOENT is expected across
515 # all platforms
516 fname = tempfile.mktemp()
517 try:
518 psutil.disk_usage(fname)
519 except OSError:
520 err = sys.exc_info()[1]
521 if err.args[0] != errno.ENOENT:
522 raise
523 else:
524 self.fail("OSError not raised")
526 def test_disk_partitions(self):
527 for disk in psutil.disk_partitions(all=False):
528 assert os.path.exists(disk.device), disk
529 assert os.path.isdir(disk.mountpoint), disk
530 assert disk.fstype, disk
531 assert isinstance(disk.opts, str)
532 for disk in psutil.disk_partitions(all=True):
533 if not WINDOWS:
534 try:
535 os.stat(disk.mountpoint)
536 except OSError:
537 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
538 err = sys.exc_info()[1]
539 if err.errno not in (errno.EPERM, errno.EACCES):
540 raise
541 else:
542 assert os.path.isdir(disk.mountpoint), disk.mountpoint
543 assert isinstance(disk.fstype, str)
544 assert isinstance(disk.opts, str)
546 def find_mount_point(path):
547 path = os.path.abspath(path)
548 while not os.path.ismount(path):
549 path = os.path.dirname(path)
550 return path
552 mount = find_mount_point(__file__)
553 mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
554 self.assertIn(mount, mounts)
555 psutil.disk_usage(mount)
557 def test_network_io_counters(self):
558 def check_ntuple(nt):
559 self.assertEqual(nt[0], nt.bytes_sent)
560 self.assertEqual(nt[1], nt.bytes_recv)
561 self.assertEqual(nt[2], nt.packets_sent)
562 self.assertEqual(nt[3], nt.packets_recv)
563 self.assertEqual(nt[4], nt.errin)
564 self.assertEqual(nt[5], nt.errout)
565 self.assertEqual(nt[6], nt.dropin)
566 self.assertEqual(nt[7], nt.dropout)
567 assert nt.bytes_sent >= 0, nt
568 assert nt.bytes_recv >= 0, nt
569 assert nt.packets_sent >= 0, nt
570 assert nt.packets_recv >= 0, nt
571 assert nt.errin >= 0, nt
572 assert nt.errout >= 0, nt
573 assert nt.dropin >= 0, nt
574 assert nt.dropout >= 0, nt
576 ret = psutil.network_io_counters(pernic=False)
577 check_ntuple(ret)
578 ret = psutil.network_io_counters(pernic=True)
579 assert ret != []
580 for key in ret:
581 assert key
582 check_ntuple(ret[key])
584 def test_disk_io_counters(self):
585 def check_ntuple(nt):
586 self.assertEqual(nt[0], nt.read_count)
587 self.assertEqual(nt[1], nt.write_count)
588 self.assertEqual(nt[2], nt.read_bytes)
589 self.assertEqual(nt[3], nt.write_bytes)
590 self.assertEqual(nt[4], nt.read_time)
591 self.assertEqual(nt[5], nt.write_time)
592 assert nt.read_count >= 0, nt
593 assert nt.write_count >= 0, nt
594 assert nt.read_bytes >= 0, nt
595 assert nt.write_bytes >= 0, nt
596 assert nt.read_time >= 0, nt
597 assert nt.write_time >= 0, nt
599 ret = psutil.disk_io_counters(perdisk=False)
600 check_ntuple(ret)
601 ret = psutil.disk_io_counters(perdisk=True)
602 for key in ret:
603 assert key, key
604 check_ntuple(ret[key])
606 def test_get_users(self):
607 users = psutil.get_users()
608 assert users
609 for user in users:
610 assert user.name, user
611 user.terminal
612 user.host
613 assert user.started > 0.0, user
614 datetime.datetime.fromtimestamp(user.started)
616 # ====================
617 # Process object tests
618 # ====================
620 def test_kill(self):
621 sproc = get_test_subprocess(wait=True)
622 test_pid = sproc.pid
623 p = psutil.Process(test_pid)
624 name = p.name
625 p.kill()
626 p.wait()
627 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
629 def test_terminate(self):
630 sproc = get_test_subprocess(wait=True)
631 test_pid = sproc.pid
632 p = psutil.Process(test_pid)
633 name = p.name
634 p.terminate()
635 p.wait()
636 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
638 def test_send_signal(self):
639 if POSIX:
640 sig = signal.SIGKILL
641 else:
642 sig = signal.SIGTERM
643 sproc = get_test_subprocess()
644 test_pid = sproc.pid
645 p = psutil.Process(test_pid)
646 name = p.name
647 p.send_signal(sig)
648 p.wait()
649 self.assertFalse(psutil.pid_exists(test_pid) and name == PYTHON)
651 def test_wait(self):
652 # check exit code signal
653 sproc = get_test_subprocess()
654 p = psutil.Process(sproc.pid)
655 p.kill()
656 code = p.wait()
657 if os.name == 'posix':
658 self.assertEqual(code, signal.SIGKILL)
659 else:
660 self.assertEqual(code, 0)
661 self.assertFalse(p.is_running())
663 sproc = get_test_subprocess()
664 p = psutil.Process(sproc.pid)
665 p.terminate()
666 code = p.wait()
667 if os.name == 'posix':
668 self.assertEqual(code, signal.SIGTERM)
669 else:
670 self.assertEqual(code, 0)
671 self.assertFalse(p.is_running())
673 # check sys.exit() code
674 code = "import time, sys; time.sleep(0.01); sys.exit(5);"
675 sproc = get_test_subprocess([PYTHON, "-c", code])
676 p = psutil.Process(sproc.pid)
677 self.assertEqual(p.wait(), 5)
678 self.assertFalse(p.is_running())
680 # Test wait() issued twice.
681 # It is not supposed to raise NSP when the process is gone.
682 # On UNIX this should return None, on Windows it should keep
683 # returning the exit code.
684 sproc = get_test_subprocess([PYTHON, "-c", code])
685 p = psutil.Process(sproc.pid)
686 self.assertEqual(p.wait(), 5)
687 self.assertIn(p.wait(), (5, None))
689 # test timeout
690 sproc = get_test_subprocess()
691 p = psutil.Process(sproc.pid)
692 p.name
693 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
695 # timeout < 0 not allowed
696 self.assertRaises(ValueError, p.wait, -1)
698 @skipUnless(POSIX)
699 def test_wait_non_children(self):
700 # test wait() against processes which are not our children
701 code = "import sys;"
702 code += "from subprocess import Popen, PIPE;"
703 code += "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
704 code += "sp = Popen(cmd, stdout=PIPE);"
705 code += "sys.stdout.write(str(sp.pid));"
706 sproc = get_test_subprocess([PYTHON, "-c", code], stdout=subprocess.PIPE)
708 grandson_pid = int(sproc.stdout.read())
709 grandson_proc = psutil.Process(grandson_pid)
710 try:
711 self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
712 grandson_proc.kill()
713 ret = grandson_proc.wait()
714 self.assertEqual(ret, None)
715 finally:
716 if grandson_proc.is_running():
717 grandson_proc.kill()
718 grandson_proc.wait()
720 def test_wait_timeout_0(self):
721 sproc = get_test_subprocess()
722 p = psutil.Process(sproc.pid)
723 self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
724 p.kill()
725 stop_at = time.time() + 2
726 while 1:
727 try:
728 code = p.wait(0)
729 except psutil.TimeoutExpired:
730 if time.time() >= stop_at:
731 raise
732 else:
733 break
734 if os.name == 'posix':
735 self.assertEqual(code, signal.SIGKILL)
736 else:
737 self.assertEqual(code, 0)
738 self.assertFalse(p.is_running())
740 def test_cpu_percent(self):
741 p = psutil.Process(os.getpid())
742 p.get_cpu_percent(interval=0.001)
743 p.get_cpu_percent(interval=0.001)
744 for x in range(100):
745 percent = p.get_cpu_percent(interval=None)
746 assert isinstance(percent, float)
747 assert percent >= 0.0, percent
748 if os.name != 'posix':
749 assert percent <= 100.0, percent
750 else:
751 assert percent >= 0.0, percent
753 def test_cpu_times(self):
754 times = psutil.Process(os.getpid()).get_cpu_times()
755 assert (times.user > 0.0) or (times.system > 0.0), times
756 # make sure returned values can be pretty printed with strftime
757 time.strftime("%H:%M:%S", time.localtime(times.user))
758 time.strftime("%H:%M:%S", time.localtime(times.system))
760 # Test Process.cpu_times() against os.times()
761 # os.times() is broken on Python 2.6
762 # http://bugs.python.org/issue1040026
763 # XXX fails on OSX: not sure if it's for os.times(). We should
764 # try this with Python 2.7 and re-enable the test.
766 @skipUnless(sys.version_info > (2, 6, 1) and not OSX)
767 def test_cpu_times2(self):
768 user_time, kernel_time = psutil.Process(os.getpid()).get_cpu_times()
769 utime, ktime = os.times()[:2]
771 # Use os.times()[:2] as base values to compare our results
772 # using a tolerance of +/- 0.1 seconds.
773 # It will fail if the difference between the values is > 0.1s.
774 if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
775 self.fail("expected: %s, found: %s" %(utime, user_time))
777 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
778 self.fail("expected: %s, found: %s" %(ktime, kernel_time))
780 def test_create_time(self):
781 sproc = get_test_subprocess(wait=True)
782 now = time.time()
783 p = psutil.Process(sproc.pid)
784 create_time = p.create_time
786 # Use time.time() as base value to compare our result using a
787 # tolerance of +/- 1 second.
788 # It will fail if the difference between the values is > 2s.
789 difference = abs(create_time - now)
790 if difference > 2:
791 self.fail("expected: %s, found: %s, difference: %s"
792 % (now, create_time, difference))
794 # make sure returned value can be pretty printed with strftime
795 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time))
797 @skipIf(WINDOWS)
798 def test_terminal(self):
799 tty = sh('tty')
800 p = psutil.Process(os.getpid())
801 self.assertEqual(p.terminal, tty)
803 @skipIf(OSX, warn=False)
804 def test_get_io_counters(self):
805 p = psutil.Process(os.getpid())
806 # test reads
807 io1 = p.get_io_counters()
808 f = open(PYTHON, 'rb')
809 f.read()
810 f.close()
811 io2 = p.get_io_counters()
812 if not BSD:
813 assert io2.read_count > io1.read_count, (io1, io2)
814 self.assertEqual(io2.write_count, io1.write_count)
815 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
816 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
817 # test writes
818 io1 = p.get_io_counters()
819 f = tempfile.TemporaryFile()
820 if PY3:
821 f.write(bytes("x" * 1000000, 'ascii'))
822 else:
823 f.write("x" * 1000000)
824 f.close()
825 io2 = p.get_io_counters()
826 assert io2.write_count >= io1.write_count, (io1, io2)
827 assert io2.write_bytes >= io1.write_bytes, (io1, io2)
828 assert io2.read_count >= io1.read_count, (io1, io2)
829 assert io2.read_bytes >= io1.read_bytes, (io1, io2)
831 @skipUnless(LINUX)
832 def test_get_set_ionice(self):
833 from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, IOPRIO_CLASS_BE,
834 IOPRIO_CLASS_IDLE)
835 self.assertEqual(IOPRIO_CLASS_NONE, 0)
836 self.assertEqual(IOPRIO_CLASS_RT, 1)
837 self.assertEqual(IOPRIO_CLASS_BE, 2)
838 self.assertEqual(IOPRIO_CLASS_IDLE, 3)
839 p = psutil.Process(os.getpid())
840 try:
841 p.set_ionice(2)
842 ioclass, value = p.get_ionice()
843 self.assertEqual(ioclass, 2)
844 self.assertEqual(value, 4)
846 p.set_ionice(3)
847 ioclass, value = p.get_ionice()
848 self.assertEqual(ioclass, 3)
849 self.assertEqual(value, 0)
851 p.set_ionice(2, 0)
852 ioclass, value = p.get_ionice()
853 self.assertEqual(ioclass, 2)
854 self.assertEqual(value, 0)
855 p.set_ionice(2, 7)
856 ioclass, value = p.get_ionice()
857 self.assertEqual(ioclass, 2)
858 self.assertEqual(value, 7)
859 self.assertRaises(ValueError, p.set_ionice, 2, 10)
860 finally:
861 p.set_ionice(IOPRIO_CLASS_NONE)
863 def test_get_num_threads(self):
864 # on certain platforms such as Linux we might test for exact
865 # thread number, since we always have with 1 thread per process,
866 # but this does not apply across all platforms (OSX, Windows)
867 p = psutil.Process(os.getpid())
868 step1 = p.get_num_threads()
870 thread = ThreadTask()
871 thread.start()
872 try:
873 step2 = p.get_num_threads()
874 self.assertEqual(step2, step1 + 1)
875 thread.stop()
876 finally:
877 if thread._running:
878 thread.stop()
880 @skipUnless(WINDOWS)
881 def test_get_num_handles(self):
882 # a better test is done later into test/_windows.py
883 p = psutil.Process(os.getpid())
884 assert p.get_num_handles() > 0
886 def test_get_threads(self):
887 p = psutil.Process(os.getpid())
888 step1 = p.get_threads()
890 thread = ThreadTask()
891 thread.start()
893 try:
894 step2 = p.get_threads()
895 self.assertEqual(len(step2), len(step1) + 1)
896 # on Linux, first thread id is supposed to be this process
897 if LINUX:
898 self.assertEqual(step2[0].id, os.getpid())
899 athread = step2[0]
900 # test named tuple
901 self.assertEqual(athread.id, athread[0])
902 self.assertEqual(athread.user_time, athread[1])
903 self.assertEqual(athread.system_time, athread[2])
904 # test num threads
905 thread.stop()
906 finally:
907 if thread._running:
908 thread.stop()
910 def test_get_memory_info(self):
911 p = psutil.Process(os.getpid())
913 # step 1 - get a base value to compare our results
914 rss1, vms1 = p.get_memory_info()
915 percent1 = p.get_memory_percent()
916 assert rss1 > 0
917 assert vms1 > 0
919 # step 2 - allocate some memory
920 memarr = [None] * 1500000
922 rss2, vms2 = p.get_memory_info()
923 percent2 = p.get_memory_percent()
924 # make sure that the memory usage bumped up
925 assert rss2 > rss1
926 assert vms2 >= vms1 # vms might be equal
927 assert percent2 > percent1
928 del memarr
930 # def test_get_ext_memory_info(self):
931 # # tested later in fetch all test suite
933 def test_get_memory_maps(self):
934 p = psutil.Process(os.getpid())
935 maps = p.get_memory_maps()
936 paths = [x for x in maps]
937 self.assertEqual(len(paths), len(set(paths)))
938 ext_maps = p.get_memory_maps(grouped=False)
940 for nt in maps:
941 if not nt.path.startswith('['):
942 assert os.path.isabs(nt.path), nt.path
943 if POSIX:
944 assert os.path.exists(nt.path), nt.path
945 else:
946 # XXX - On Windows we have this strange behavior with
947 # 64 bit dlls: they are visible via explorer but cannot
948 # be accessed via os.stat() (wtf?).
949 if '64' not in os.path.basename(nt.path):
950 assert os.path.exists(nt.path), nt.path
951 for nt in ext_maps:
952 for fname in nt._fields:
953 value = getattr(nt, fname)
954 if fname == 'path':
955 continue
956 elif fname in ('addr', 'perms'):
957 assert value, value
958 else:
959 assert isinstance(value, (int, long))
960 assert value >= 0, value
962 def test_get_memory_percent(self):
963 p = psutil.Process(os.getpid())
964 assert p.get_memory_percent() > 0.0
966 def test_pid(self):
967 sproc = get_test_subprocess()
968 self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
970 def test_is_running(self):
971 sproc = get_test_subprocess(wait=True)
972 p = psutil.Process(sproc.pid)
973 assert p.is_running()
974 assert p.is_running()
975 p.kill()
976 p.wait()
977 assert not p.is_running()
978 assert not p.is_running()
980 def test_exe(self):
981 sproc = get_test_subprocess(wait=True)
982 exe = psutil.Process(sproc.pid).exe
983 try:
984 self.assertEqual(exe, PYTHON)
985 except AssertionError:
986 # certain platforms such as BSD are more accurate returning:
987 # "/usr/local/bin/python2.7"
988 # ...instead of:
989 # "/usr/local/bin/python"
990 # We do not want to consider this difference in accuracy
991 # an error.
992 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
993 self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
995 def test_cmdline(self):
996 sproc = get_test_subprocess([PYTHON, "-E"], wait=True)
997 self.assertEqual(psutil.Process(sproc.pid).cmdline, [PYTHON, "-E"])
999 def test_name(self):
1000 sproc = get_test_subprocess(PYTHON, wait=True)
1001 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
1002 self.assertEqual(psutil.Process(sproc.pid).name.lower(), pyexe)
1004 if os.name == 'posix':
1006 def test_uids(self):
1007 p = psutil.Process(os.getpid())
1008 real, effective, saved = p.uids
1009 # os.getuid() refers to "real" uid
1010 self.assertEqual(real, os.getuid())
1011 # os.geteuid() refers to "effective" uid
1012 self.assertEqual(effective, os.geteuid())
1013 # no such thing as os.getsuid() ("saved" uid), but starting
1014 # from python 2.7 we have os.getresuid()[2]
1015 if hasattr(os, "getresuid"):
1016 self.assertEqual(saved, os.getresuid()[2])
1018 def test_gids(self):
1019 p = psutil.Process(os.getpid())
1020 real, effective, saved = p.gids
1021 # os.getuid() refers to "real" uid
1022 self.assertEqual(real, os.getgid())
1023 # os.geteuid() refers to "effective" uid
1024 self.assertEqual(effective, os.getegid())
1025 # no such thing as os.getsuid() ("saved" uid), but starting
1026 # from python 2.7 we have os.getresgid()[2]
1027 if hasattr(os, "getresuid"):
1028 self.assertEqual(saved, os.getresgid()[2])
1030 def test_nice(self):
1031 p = psutil.Process(os.getpid())
1032 self.assertRaises(TypeError, p.set_nice, "str")
1033 try:
1034 try:
1035 first_nice = p.get_nice()
1036 p.set_nice(1)
1037 self.assertEqual(p.get_nice(), 1)
1038 # going back to previous nice value raises AccessDenied on OSX
1039 if not OSX:
1040 p.set_nice(0)
1041 self.assertEqual(p.get_nice(), 0)
1042 except psutil.AccessDenied:
1043 pass
1044 finally:
1045 try:
1046 p.set_nice(first_nice)
1047 except psutil.AccessDenied:
1048 pass
1050 if os.name == 'nt':
1052 def test_nice(self):
1053 p = psutil.Process(os.getpid())
1054 self.assertRaises(TypeError, p.set_nice, "str")
1055 try:
1056 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
1057 p.set_nice(psutil.HIGH_PRIORITY_CLASS)
1058 self.assertEqual(p.get_nice(), psutil.HIGH_PRIORITY_CLASS)
1059 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1060 self.assertEqual(p.get_nice(), psutil.NORMAL_PRIORITY_CLASS)
1061 finally:
1062 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1064 def test_status(self):
1065 p = psutil.Process(os.getpid())
1066 self.assertEqual(p.status, psutil.STATUS_RUNNING)
1067 self.assertEqual(str(p.status), "running")
1069 def test_status_constants(self):
1070 # STATUS_* constants are supposed to be comparable also by
1071 # using their str representation
1072 self.assertTrue(psutil.STATUS_RUNNING == 0)
1073 self.assertTrue(psutil.STATUS_RUNNING == long(0))
1074 self.assertTrue(psutil.STATUS_RUNNING == 'running')
1075 self.assertFalse(psutil.STATUS_RUNNING == 1)
1076 self.assertFalse(psutil.STATUS_RUNNING == 'sleeping')
1077 self.assertFalse(psutil.STATUS_RUNNING != 0)
1078 self.assertFalse(psutil.STATUS_RUNNING != 'running')
1079 self.assertTrue(psutil.STATUS_RUNNING != 1)
1080 self.assertTrue(psutil.STATUS_RUNNING != 'sleeping')
1082 def test_username(self):
1083 sproc = get_test_subprocess()
1084 p = psutil.Process(sproc.pid)
1085 if POSIX:
1086 import pwd
1087 self.assertEqual(p.username, pwd.getpwuid(os.getuid()).pw_name)
1088 elif WINDOWS:
1089 expected_username = os.environ['USERNAME']
1090 expected_domain = os.environ['USERDOMAIN']
1091 domain, username = p.username.split('\\')
1092 self.assertEqual(domain, expected_domain)
1093 self.assertEqual(username, expected_username)
1094 else:
1095 p.username
1097 @skipIf(not hasattr(psutil.Process, "getcwd"))
1098 def test_getcwd(self):
1099 sproc = get_test_subprocess(wait=True)
1100 p = psutil.Process(sproc.pid)
1101 self.assertEqual(p.getcwd(), os.getcwd())
1103 @skipIf(not hasattr(psutil.Process, "getcwd"))
1104 def test_getcwd_2(self):
1105 cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
1106 sproc = get_test_subprocess(cmd, wait=True)
1107 p = psutil.Process(sproc.pid)
1108 call_until(p.getcwd, "ret == os.path.dirname(os.getcwd())", timeout=1)
1110 @skipIf(not hasattr(psutil.Process, "get_cpu_affinity"))
1111 def test_cpu_affinity(self):
1112 p = psutil.Process(os.getpid())
1113 initial = p.get_cpu_affinity()
1114 all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
1116 for n in all_cpus:
1117 p.set_cpu_affinity([n])
1118 self.assertEqual(p.get_cpu_affinity(), [n])
1120 p.set_cpu_affinity(all_cpus)
1121 self.assertEqual(p.get_cpu_affinity(), all_cpus)
1123 p.set_cpu_affinity(initial)
1124 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
1125 self.assertRaises(ValueError, p.set_cpu_affinity, invalid_cpu)
1127 def test_get_open_files(self):
1128 # current process
1129 p = psutil.Process(os.getpid())
1130 files = p.get_open_files()
1131 self.assertFalse(TESTFN in files)
1132 f = open(TESTFN, 'w')
1133 call_until(p.get_open_files, "len(ret) != %i" % len(files))
1134 filenames = [x.path for x in p.get_open_files()]
1135 self.assertIn(TESTFN, filenames)
1136 f.close()
1137 for file in filenames:
1138 assert os.path.isfile(file), file
1140 # another process
1141 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
1142 sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
1143 p = psutil.Process(sproc.pid)
1144 for x in range(100):
1145 filenames = [x.path for x in p.get_open_files()]
1146 if TESTFN in filenames:
1147 break
1148 time.sleep(.01)
1149 else:
1150 self.assertIn(TESTFN, filenames)
1151 for file in filenames:
1152 assert os.path.isfile(file), file
1154 def test_get_open_files2(self):
1155 # test fd and path fields
1156 fileobj = open(TESTFN, 'w')
1157 p = psutil.Process(os.getpid())
1158 for path, fd in p.get_open_files():
1159 if path == fileobj.name or fd == fileobj.fileno():
1160 break
1161 else:
1162 self.fail("no file found; files=%s" % repr(p.get_open_files()))
1163 self.assertEqual(path, fileobj.name)
1164 if WINDOWS:
1165 self.assertEqual(fd, -1)
1166 else:
1167 self.assertEqual(fd, fileobj.fileno())
1168 # test positions
1169 ntuple = p.get_open_files()[0]
1170 self.assertEqual(ntuple[0], ntuple.path)
1171 self.assertEqual(ntuple[1], ntuple.fd)
1172 # test file is gone
1173 fileobj.close()
1174 self.assertTrue(fileobj.name not in p.get_open_files())
1176 def test_get_connections(self):
1177 arg = "import socket, time;" \
1178 "s = socket.socket();" \
1179 "s.bind(('127.0.0.1', 0));" \
1180 "s.listen(1);" \
1181 "conn, addr = s.accept();" \
1182 "time.sleep(100);"
1183 sproc = get_test_subprocess([PYTHON, "-c", arg])
1184 p = psutil.Process(sproc.pid)
1185 for x in range(100):
1186 cons = p.get_connections()
1187 if cons:
1188 break
1189 time.sleep(.01)
1190 self.assertEqual(len(cons), 1)
1191 con = cons[0]
1192 self.assertEqual(con.family, socket.AF_INET)
1193 self.assertEqual(con.type, socket.SOCK_STREAM)
1194 self.assertEqual(con.status, "LISTEN")
1195 ip, port = con.local_address
1196 self.assertEqual(ip, '127.0.0.1')
1197 self.assertEqual(con.remote_address, ())
1198 if WINDOWS:
1199 self.assertEqual(con.fd, -1)
1200 else:
1201 assert con.fd > 0, con
1202 # test positions
1203 self.assertEqual(con[0], con.fd)
1204 self.assertEqual(con[1], con.family)
1205 self.assertEqual(con[2], con.type)
1206 self.assertEqual(con[3], con.local_address)
1207 self.assertEqual(con[4], con.remote_address)
1208 self.assertEqual(con[5], con.status)
1209 # test kind arg
1210 self.assertRaises(ValueError, p.get_connections, 'foo')
1212 @skipUnless(supports_ipv6())
1213 def test_get_connections_ipv6(self):
1214 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1215 s.bind(('::1', 0))
1216 s.listen(1)
1217 cons = psutil.Process(os.getpid()).get_connections()
1218 s.close()
1219 self.assertEqual(len(cons), 1)
1220 self.assertEqual(cons[0].local_address[0], '::1')
1222 @skipUnless(hasattr(socket, 'AF_UNIX'))
1223 def test_get_connections_unix(self):
1224 # tcp
1225 safe_remove(TESTFN)
1226 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1227 sock.bind(TESTFN)
1228 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
1229 self.assertEqual(conn.fd, sock.fileno())
1230 self.assertEqual(conn.family, socket.AF_UNIX)
1231 self.assertEqual(conn.type, socket.SOCK_STREAM)
1232 self.assertEqual(conn.local_address, TESTFN)
1233 self.assertTrue(not conn.remote_address)
1234 self.assertTrue(not conn.status)
1235 sock.close()
1236 # udp
1237 safe_remove(TESTFN)
1238 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
1239 sock.bind(TESTFN)
1240 conn = psutil.Process(os.getpid()).get_connections(kind='unix')[0]
1241 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1242 sock.close()
1244 @skipUnless(hasattr(socket, "fromfd") and not WINDOWS)
1245 def test_connection_fromfd(self):
1246 sock = socket.socket()
1247 sock.bind(('localhost', 0))
1248 sock.listen(1)
1249 p = psutil.Process(os.getpid())
1250 for conn in p.get_connections():
1251 if conn.fd == sock.fileno():
1252 break
1253 else:
1254 sock.close()
1255 self.fail("couldn't find socket fd")
1256 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
1257 try:
1258 self.assertEqual(dupsock.getsockname(), conn.local_address)
1259 self.assertNotEqual(sock.fileno(), dupsock.fileno())
1260 finally:
1261 sock.close()
1262 dupsock.close()
1264 def test_get_connections_all(self):
1265 tcp_template = "import socket;" \
1266 "s = socket.socket($family, socket.SOCK_STREAM);" \
1267 "s.bind(('$addr', 0));" \
1268 "s.listen(1);" \
1269 "conn, addr = s.accept();"
1271 udp_template = "import socket, time;" \
1272 "s = socket.socket($family, socket.SOCK_DGRAM);" \
1273 "s.bind(('$addr', 0));" \
1274 "time.sleep(100);"
1276 from string import Template
1277 tcp4_template = Template(tcp_template).substitute(family=socket.AF_INET,
1278 addr="127.0.0.1")
1279 udp4_template = Template(udp_template).substitute(family=socket.AF_INET,
1280 addr="127.0.0.1")
1281 tcp6_template = Template(tcp_template).substitute(family=socket.AF_INET6,
1282 addr="::1")
1283 udp6_template = Template(udp_template).substitute(family=socket.AF_INET6,
1284 addr="::1")
1286 # launch various subprocess instantiating a socket of various
1287 # families and types to enrich psutil results
1288 tcp4_proc = get_test_subprocess([PYTHON, "-c", tcp4_template])
1289 udp4_proc = get_test_subprocess([PYTHON, "-c", udp4_template])
1290 if supports_ipv6():
1291 tcp6_proc = get_test_subprocess([PYTHON, "-c", tcp6_template])
1292 udp6_proc = get_test_subprocess([PYTHON, "-c", udp6_template])
1293 else:
1294 tcp6_proc = None
1295 udp6_proc = None
1297 # check matches against subprocesses just created
1298 all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
1299 "udp", "udp4", "udp6")
1300 for p in psutil.Process(os.getpid()).get_children():
1301 for conn in p.get_connections():
1302 # TCP v4
1303 if p.pid == tcp4_proc.pid:
1304 self.assertEqual(conn.family, socket.AF_INET)
1305 self.assertEqual(conn.type, socket.SOCK_STREAM)
1306 self.assertEqual(conn.local_address[0], "127.0.0.1")
1307 self.assertEqual(conn.remote_address, ())
1308 self.assertEqual(conn.status, "LISTEN")
1309 for kind in all_kinds:
1310 cons = p.get_connections(kind=kind)
1311 if kind in ("all", "inet", "inet4", "tcp", "tcp4"):
1312 assert cons != [], cons
1313 else:
1314 self.assertEqual(cons, [], cons)
1315 # UDP v4
1316 elif p.pid == udp4_proc.pid:
1317 self.assertEqual(conn.family, socket.AF_INET)
1318 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1319 self.assertEqual(conn.local_address[0], "127.0.0.1")
1320 self.assertEqual(conn.remote_address, ())
1321 self.assertEqual(conn.status, "")
1322 for kind in all_kinds:
1323 cons = p.get_connections(kind=kind)
1324 if kind in ("all", "inet", "inet4", "udp", "udp4"):
1325 assert cons != [], cons
1326 else:
1327 self.assertEqual(cons, [], cons)
1328 # TCP v6
1329 elif p.pid == getattr(tcp6_proc, "pid", None):
1330 self.assertEqual(conn.family, socket.AF_INET6)
1331 self.assertEqual(conn.type, socket.SOCK_STREAM)
1332 self.assertIn(conn.local_address[0], ("::", "::1"))
1333 self.assertEqual(conn.remote_address, ())
1334 self.assertEqual(conn.status, "LISTEN")
1335 for kind in all_kinds:
1336 cons = p.get_connections(kind=kind)
1337 if kind in ("all", "inet", "inet6", "tcp", "tcp6"):
1338 assert cons != [], cons
1339 else:
1340 self.assertEqual(cons, [], cons)
1341 # UDP v6
1342 elif p.pid == getattr(udp6_proc, "pid", None):
1343 self.assertEqual(conn.family, socket.AF_INET6)
1344 self.assertEqual(conn.type, socket.SOCK_DGRAM)
1345 self.assertIn(conn.local_address[0], ("::", "::1"))
1346 self.assertEqual(conn.remote_address, ())
1347 self.assertEqual(conn.status, "")
1348 for kind in all_kinds:
1349 cons = p.get_connections(kind=kind)
1350 if kind in ("all", "inet", "inet6", "udp", "udp6"):
1351 assert cons != [], cons
1352 else:
1353 self.assertEqual(cons, [], cons)
1355 @skipUnless(POSIX)
1356 def test_get_num_fds(self):
1357 p = psutil.Process(os.getpid())
1358 start = p.get_num_fds()
1359 file = open(TESTFN, 'w')
1360 self.assertEqual(p.get_num_fds(), start + 1)
1361 sock = socket.socket()
1362 self.assertEqual(p.get_num_fds(), start + 2)
1363 file.close()
1364 sock.close()
1365 self.assertEqual(p.get_num_fds(), start)
1367 def test_get_num_ctx_switches(self):
1368 p = psutil.Process(os.getpid())
1369 before = sum(p.get_num_ctx_switches())
1370 for x in range(50000):
1371 after = sum(p.get_num_ctx_switches())
1372 if after > before:
1373 return
1374 self.fail("num ctx switches still the same after 50.000 iterations")
1376 def test_parent_ppid(self):
1377 this_parent = os.getpid()
1378 sproc = get_test_subprocess()
1379 p = psutil.Process(sproc.pid)
1380 self.assertEqual(p.ppid, this_parent)
1381 self.assertEqual(p.parent.pid, this_parent)
1382 # no other process is supposed to have us as parent
1383 for p in psutil.process_iter():
1384 if p.pid == sproc.pid:
1385 continue
1386 self.assertTrue(p.ppid != this_parent)
1388 def test_get_children(self):
1389 p = psutil.Process(os.getpid())
1390 self.assertEqual(p.get_children(), [])
1391 self.assertEqual(p.get_children(recursive=True), [])
1392 sproc = get_test_subprocess()
1393 children1 = p.get_children()
1394 children2 = p.get_children(recursive=True)
1395 for children in (children1, children2):
1396 self.assertEqual(len(children), 1)
1397 self.assertEqual(children[0].pid, sproc.pid)
1398 self.assertEqual(children[0].ppid, os.getpid())
1400 def test_get_children_recursive(self):
1401 # here we create a subprocess which creates another one as in:
1402 # A (parent) -> B (child) -> C (grandchild)
1403 s = "import subprocess, os, sys, time;"
1404 s += "PYTHON = os.path.realpath(sys.executable);"
1405 s += "cmd = [PYTHON, '-c', 'import time; time.sleep(3600);'];"
1406 s += "subprocess.Popen(cmd);"
1407 s += "time.sleep(3600);"
1408 get_test_subprocess(cmd=[PYTHON, "-c", s])
1409 p = psutil.Process(os.getpid())
1410 self.assertEqual(len(p.get_children(recursive=False)), 1)
1411 # give the grandchild some time to start
1412 stop_at = time.time() + 1.5
1413 while time.time() < stop_at:
1414 children = p.get_children(recursive=True)
1415 if len(children) > 1:
1416 break
1417 self.assertEqual(len(children), 2)
1418 self.assertEqual(children[0].ppid, os.getpid())
1419 self.assertEqual(children[1].ppid, children[0].pid)
1421 def test_get_children_duplicates(self):
1422 # find the process which has the highest number of children
1423 from psutil._compat import defaultdict
1424 table = defaultdict(int)
1425 for p in psutil.process_iter():
1426 try:
1427 table[p.ppid] += 1
1428 except psutil.Error:
1429 pass
1430 # this is the one, now let's make sure there are no duplicates
1431 pid = max(sorted(table, key=lambda x: table[x]))
1432 p = psutil.Process(pid)
1433 try:
1434 c = p.get_children(recursive=True)
1435 except psutil.AccessDenied: # windows
1436 pass
1437 else:
1438 self.assertEqual(len(c), len(set(c)))
1440 def test_suspend_resume(self):
1441 sproc = get_test_subprocess(wait=True)
1442 p = psutil.Process(sproc.pid)
1443 p.suspend()
1444 for x in range(100):
1445 if p.status == psutil.STATUS_STOPPED:
1446 break
1447 time.sleep(0.01)
1448 self.assertEqual(str(p.status), "stopped")
1449 p.resume()
1450 assert p.status != psutil.STATUS_STOPPED, p.status
1452 def test_invalid_pid(self):
1453 self.assertRaises(TypeError, psutil.Process, "1")
1454 self.assertRaises(TypeError, psutil.Process, None)
1455 # Refers to Issue #12
1456 self.assertRaises(psutil.NoSuchProcess, psutil.Process, -1)
1458 def test_as_dict(self):
1459 sproc = get_test_subprocess()
1460 p = psutil.Process(sproc.pid)
1461 d = p.as_dict()
1462 try:
1463 import json
1464 except ImportError:
1465 pass
1466 else:
1467 # dict is supposed to be hashable
1468 json.dumps(d)
1470 d = p.as_dict(attrs=['exe', 'name'])
1471 self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1473 p = psutil.Process(min(psutil.get_pid_list()))
1474 d = p.as_dict(attrs=['get_connections'], ad_value='foo')
1475 if not isinstance(d['connections'], list):
1476 self.assertEqual(d['connections'], 'foo')
1478 def test_zombie_process(self):
1479 # Test that NoSuchProcess exception gets raised in case the
1480 # process dies after we create the Process object.
1481 # Example:
1482 # >>> proc = Process(1234)
1483 # >>> time.sleep(5) # time-consuming task, process dies in meantime
1484 # >>> proc.name
1485 # Refers to Issue #15
1486 sproc = get_test_subprocess()
1487 p = psutil.Process(sproc.pid)
1488 p.kill()
1489 p.wait()
1491 for name in dir(p):
1492 if name.startswith('_')\
1493 or name in ('pid', 'send_signal', 'is_running', 'set_ionice',
1494 'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
1495 'nice'):
1496 continue
1497 try:
1498 meth = getattr(p, name)
1499 if callable(meth):
1500 meth()
1501 except psutil.NoSuchProcess:
1502 pass
1503 else:
1504 self.fail("NoSuchProcess exception not raised for %r" % name)
1506 # other methods
1507 try:
1508 if os.name == 'posix':
1509 p.set_nice(1)
1510 else:
1511 p.set_nice(psutil.NORMAL_PRIORITY_CLASS)
1512 except psutil.NoSuchProcess:
1513 pass
1514 else:
1515 self.fail("exception not raised")
1516 if hasattr(p, 'set_ionice'):
1517 self.assertRaises(psutil.NoSuchProcess, p.set_ionice, 2)
1518 self.assertRaises(psutil.NoSuchProcess, p.send_signal, signal.SIGTERM)
1519 self.assertRaises(psutil.NoSuchProcess, p.set_nice, 0)
1520 self.assertFalse(p.is_running())
1521 if hasattr(p, "set_cpu_affinity"):
1522 self.assertRaises(psutil.NoSuchProcess, p.set_cpu_affinity, [0])
1524 def test__str__(self):
1525 sproc = get_test_subprocess()
1526 p = psutil.Process(sproc.pid)
1527 self.assertIn(str(sproc.pid), str(p))
1528 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1529 if not OSX:
1530 self.assertIn(os.path.basename(PYTHON), str(p))
1531 sproc = get_test_subprocess()
1532 p = psutil.Process(sproc.pid)
1533 p.kill()
1534 p.wait()
1535 self.assertIn(str(sproc.pid), str(p))
1536 self.assertIn("terminated", str(p))
1538 @skipIf(LINUX)
1539 def test_pid_0(self):
1540 # Process(0) is supposed to work on all platforms except Linux
1541 p = psutil.Process(0)
1542 self.assertTrue(p.name)
1544 if os.name == 'posix':
1545 self.assertEqual(p.uids.real, 0)
1546 self.assertEqual(p.gids.real, 0)
1548 self.assertIn(p.ppid, (0, 1))
1549 #self.assertEqual(p.exe, "")
1550 self.assertEqual(p.cmdline, [])
1551 try:
1552 p.get_num_threads()
1553 except psutil.AccessDenied:
1554 pass
1556 try:
1557 p.get_memory_info()
1558 except psutil.AccessDenied:
1559 pass
1561 # username property
1562 if POSIX:
1563 self.assertEqual(p.username, 'root')
1564 elif WINDOWS:
1565 self.assertEqual(p.username, 'NT AUTHORITY\\SYSTEM')
1566 else:
1567 p.username
1569 self.assertIn(0, psutil.get_pid_list())
1570 self.assertTrue(psutil.pid_exists(0))
1572 def test_Popen(self):
1573 # Popen class test
1574 # XXX this test causes a ResourceWarning on Python 3 because
1575 # psutil.__subproc instance doesn't get propertly freed.
1576 # Not sure what to do though.
1577 cmd = [PYTHON, "-c", "import time; time.sleep(3600);"]
1578 proc = psutil.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1579 try:
1580 proc.name
1581 proc.stdin
1582 self.assertTrue(hasattr(proc, 'name'))
1583 self.assertTrue(hasattr(proc, 'stdin'))
1584 self.assertRaises(AttributeError, getattr, proc, 'foo')
1585 finally:
1586 proc.kill()
1587 proc.wait()
1590 class TestFetchAllProcesses(unittest.TestCase):
1591 # Iterates over all running processes and performs some sanity
1592 # checks against Process API's returned values.
1594 # Python 2.4 compatibility
1595 if not hasattr(unittest.TestCase, "assertIn"):
1596 def assertIn(self, member, container, msg=None):
1597 if member not in container:
1598 self.fail(msg or \
1599 '%s not found in %s' % (repr(member), repr(container)))
1601 def setUp(self):
1602 if POSIX:
1603 import pwd
1604 pall = pwd.getpwall()
1605 self._uids = set([x.pw_uid for x in pall])
1606 self._usernames = set([x.pw_name for x in pall])
1608 def test_fetch_all(self):
1609 valid_procs = 0
1610 excluded_names = ['send_signal', 'suspend', 'resume', 'terminate',
1611 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
1612 'parent', 'get_children', 'pid']
1613 attrs = []
1614 for name in dir(psutil.Process):
1615 if name.startswith("_"):
1616 continue
1617 if name.startswith("set_"):
1618 continue
1619 if name in excluded_names:
1620 continue
1621 attrs.append(name)
1623 for p in psutil.process_iter():
1624 for name in attrs:
1625 try:
1626 try:
1627 attr = getattr(p, name, None)
1628 if attr is not None and callable(attr):
1629 ret = attr()
1630 else:
1631 ret = attr
1632 valid_procs += 1
1633 except (psutil.NoSuchProcess, psutil.AccessDenied):
1634 err = sys.exc_info()[1]
1635 self.assertEqual(err.pid, p.pid)
1636 if err.name:
1637 # make sure exception's name attr is set
1638 # with the actual process name
1639 self.assertEqual(err.name, p.name)
1640 self.assertTrue(str(err))
1641 self.assertTrue(err.msg)
1642 else:
1643 if ret not in (0, 0.0, [], None, ''):
1644 assert ret, ret
1645 meth = getattr(self, name)
1646 meth(ret)
1647 except Exception:
1648 err = sys.exc_info()[1]
1649 trace = traceback.format_exc()
1650 self.fail('%s\nproc=%s, method=%r, retvalue=%r'
1651 % (trace, p, name, ret))
1653 # we should always have a non-empty list, not including PID 0 etc.
1654 # special cases.
1655 self.assertTrue(valid_procs > 0)
1657 def cmdline(self, ret):
1658 pass
1660 def exe(self, ret):
1661 if not ret:
1662 assert ret == ''
1663 else:
1664 assert os.path.isabs(ret), ret
1665 # Note: os.stat() may return False even if the file is there
1666 # hence we skip the test, see:
1667 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
1668 if POSIX:
1669 assert os.path.isfile(ret), ret
1670 if hasattr(os, 'access') and hasattr(os, "X_OK"):
1671 self.assertTrue(os.access(ret, os.X_OK))
1673 def ppid(self, ret):
1674 self.assertTrue(ret >= 0)
1676 def name(self, ret):
1677 self.assertTrue(isinstance(ret, str))
1678 self.assertTrue(ret)
1680 def create_time(self, ret):
1681 self.assertTrue(ret > 0)
1682 if not WINDOWS:
1683 self.assertTrue(ret >= psutil.BOOT_TIME)
1684 # make sure returned value can be pretty printed
1685 # with strftime
1686 time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
1688 def uids(self, ret):
1689 for uid in ret:
1690 self.assertTrue(uid >= 0)
1691 self.assertIn(uid, self._uids)
1693 def gids(self, ret):
1694 # note: testing all gids as above seems not to be reliable for
1695 # gid == 30 (nodoby); not sure why.
1696 for gid in ret:
1697 self.assertTrue(gid >= 0)
1698 #self.assertIn(uid, self.gids)
1700 def username(self, ret):
1701 self.assertTrue(ret)
1702 if os.name == 'posix':
1703 self.assertIn(ret, self._usernames)
1705 def status(self, ret):
1706 self.assertTrue(ret >= 0)
1707 self.assertTrue(str(ret) != '?')
1709 def get_io_counters(self, ret):
1710 for field in ret:
1711 if field != -1:
1712 self.assertTrue(field >= 0)
1714 def get_ionice(self, ret):
1715 self.assertTrue(ret.ioclass >= 0)
1716 self.assertTrue(ret.value >= 0)
1718 def get_num_threads(self, ret):
1719 self.assertTrue(ret >= 1)
1721 def get_threads(self, ret):
1722 for t in ret:
1723 self.assertTrue(t.id >= 0)
1724 self.assertTrue(t.user_time >= 0)
1725 self.assertTrue(t.system_time >= 0)
1727 def get_cpu_times(self, ret):
1728 self.assertTrue(ret.user >= 0)
1729 self.assertTrue(ret.system >= 0)
1731 def get_memory_info(self, ret):
1732 self.assertTrue(ret.rss >= 0)
1733 self.assertTrue(ret.vms >= 0)
1735 def get_ext_memory_info(self, ret):
1736 for name in ret._fields:
1737 self.assertTrue(getattr(ret, name) >= 0)
1738 if POSIX and ret.vms != 0:
1739 # VMS is always supposed to be the highest
1740 for name in ret._fields:
1741 if name != 'vms':
1742 value = getattr(ret, name)
1743 assert ret.vms > value, ret
1744 elif WINDOWS:
1745 assert ret.peak_wset >= ret.wset, ret
1746 assert ret.peak_paged_pool >= ret.paged_pool, ret
1747 assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
1748 assert ret.peak_pagefile >= ret.pagefile, ret
1750 def get_open_files(self, ret):
1751 for f in ret:
1752 if WINDOWS:
1753 assert f.fd == -1, f
1754 else:
1755 assert isinstance(f.fd, int), f
1756 assert os.path.isabs(f.path), f
1757 assert os.path.isfile(f.path), f
1759 def get_num_fds(self, ret):
1760 self.assertTrue(ret >= 0)
1762 def get_connections(self, ret):
1763 # all values are supposed to match Linux's tcp_states.h states
1764 # table across all platforms.
1765 valid_conn_states = ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
1766 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
1767 "LAST_ACK", "LISTEN", "CLOSING", ""]
1768 for conn in ret:
1769 self.assertIn(conn.type, (socket.SOCK_STREAM, socket.SOCK_DGRAM))
1770 self.assertIn(conn.family, (socket.AF_INET, socket.AF_INET6))
1771 check_ip_address(conn.local_address, conn.family)
1772 check_ip_address(conn.remote_address, conn.family)
1773 if conn.status not in valid_conn_states:
1774 self.fail("%s is not a valid status" %conn.status)
1775 # actually try to bind the local socket; ignore IPv6
1776 # sockets as their address might be represented as
1777 # an IPv4-mapped-address (e.g. "::127.0.0.1")
1778 # and that's rejected by bind()
1779 if conn.family == socket.AF_INET:
1780 s = socket.socket(conn.family, conn.type)
1781 s.bind((conn.local_address[0], 0))
1782 s.close()
1784 if not WINDOWS and hasattr(socket, 'fromfd'):
1785 dupsock = None
1786 try:
1787 try:
1788 dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
1789 except (socket.error, OSError):
1790 err = sys.exc_info()[1]
1791 if err.args[0] == errno.EBADF:
1792 continue
1793 raise
1794 # python >= 2.5
1795 if hasattr(dupsock, "family"):
1796 self.assertEqual(dupsock.family, conn.family)
1797 self.assertEqual(dupsock.type, conn.type)
1798 finally:
1799 if dupsock is not None:
1800 dupsock.close()
1802 def getcwd(self, ret):
1803 if ret is not None: # BSD may return None
1804 assert os.path.isabs(ret), ret
1805 try:
1806 st = os.stat(ret)
1807 except OSError:
1808 err = sys.exc_info()[1]
1809 # directory has been removed in mean time
1810 if err.errno != errno.ENOENT:
1811 raise
1812 else:
1813 self.assertTrue(stat.S_ISDIR(st.st_mode))
1815 def get_memory_percent(self, ret):
1816 assert 0 <= ret <= 100, ret
1818 def is_running(self, ret):
1819 self.assertTrue(ret)
1821 def get_cpu_affinity(self, ret):
1822 assert ret != [], ret
1824 def terminal(self, ret):
1825 if ret is not None:
1826 assert os.path.isabs(ret), ret
1827 assert os.path.exists(ret), ret
1829 def get_memory_maps(self, ret):
1830 for nt in ret:
1831 for fname in nt._fields:
1832 value = getattr(nt, fname)
1833 if fname == 'path':
1834 if not value.startswith('['):
1835 assert os.path.isabs(nt.path), nt.path
1836 # commented as on Linux we might get '/foo/bar (deleted)'
1837 #assert os.path.exists(nt.path), nt.path
1838 elif fname in ('addr', 'perms'):
1839 self.assertTrue(value)
1840 else:
1841 assert isinstance(value, (int, long))
1842 assert value >= 0, value
1844 def get_num_handles(self, ret):
1845 if WINDOWS:
1846 assert ret >= 0
1847 else:
1848 assert ret > 0
1850 def get_nice(self, ret):
1851 if POSIX:
1852 assert -20 <= ret <= 20, ret
1853 else:
1854 priorities = [getattr(psutil, x) for x in dir(psutil)
1855 if x.endswith('_PRIORITY_CLASS')]
1856 self.assertIn(ret, priorities)
1858 def get_num_ctx_switches(self, ret):
1859 self.assertTrue(ret.voluntary >= 0)
1860 self.assertTrue(ret.involuntary >= 0)
1862 if hasattr(os, 'getuid'):
1863 class LimitedUserTestCase(TestCase):
1864 """Repeat the previous tests by using a limited user.
1865 Executed only on UNIX and only if the user who run the test script
1866 is root.
1868 # the uid/gid the test suite runs under
1869 PROCESS_UID = os.getuid()
1870 PROCESS_GID = os.getgid()
1872 def __init__(self, *args, **kwargs):
1873 TestCase.__init__(self, *args, **kwargs)
1874 # re-define all existent test methods in order to
1875 # ignore AccessDenied exceptions
1876 for attr in [x for x in dir(self) if x.startswith('test')]:
1877 meth = getattr(self, attr)
1878 def test_(self):
1879 try:
1880 meth()
1881 except psutil.AccessDenied:
1882 pass
1883 setattr(self, attr, types.MethodType(test_, self))
1885 def setUp(self):
1886 os.setegid(1000)
1887 os.seteuid(1000)
1888 TestCase.setUp(self)
1890 def tearDown(self):
1891 os.setegid(self.PROCESS_UID)
1892 os.seteuid(self.PROCESS_GID)
1893 TestCase.tearDown(self)
1895 def test_nice(self):
1896 try:
1897 psutil.Process(os.getpid()).set_nice(-1)
1898 except psutil.AccessDenied:
1899 pass
1900 else:
1901 self.fail("exception not raised")
1904 def cleanup():
1905 reap_children(search_all=True)
1906 DEVNULL.close()
1907 safe_remove(TESTFN)
1909 atexit.register(cleanup)
1911 def test_main():
1912 tests = []
1913 test_suite = unittest.TestSuite()
1914 tests.append(TestCase)
1915 tests.append(TestFetchAllProcesses)
1917 if POSIX:
1918 from _posix import PosixSpecificTestCase
1919 tests.append(PosixSpecificTestCase)
1921 # import the specific platform test suite
1922 if LINUX:
1923 from _linux import LinuxSpecificTestCase as stc
1924 elif WINDOWS:
1925 from _windows import WindowsSpecificTestCase as stc
1926 from _windows import TestDualProcessImplementation
1927 tests.append(TestDualProcessImplementation)
1928 elif OSX:
1929 from _osx import OSXSpecificTestCase as stc
1930 elif BSD:
1931 from _bsd import BSDSpecificTestCase as stc
1932 tests.append(stc)
1934 if hasattr(os, 'getuid'):
1935 if os.getuid() == 0:
1936 tests.append(LimitedUserTestCase)
1937 else:
1938 atexit.register(warn, "Couldn't run limited user tests ("
1939 "super-user privileges are required)")
1941 for test_class in tests:
1942 test_suite.addTest(unittest.makeSuite(test_class))
1943 unittest.TextTestRunner(verbosity=2).run(test_suite)
1945 if __name__ == '__main__':
1946 test_main()