3 # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4 # Use of this source code is governed by a BSD-style license that can be
5 # found in the LICENSE file.
8 psutil test suite (you can quickly run it with "python setup.py test").
10 Note: this is targeted for both python 2.x and 3.x so there's no need
11 to use 2to3 tool first.
13 If you're on Python < 2.7 it is recommended to install unittest2 module
14 from: https://pypi.python.org/pypi/unittest2
17 from __future__
import division
35 import unittest2
as unittest
# pyhon < 2.7 + unittest2 installed
39 import ast
# python >= 2.6
44 from psutil
._compat
import PY3
, callable, long, wraps
47 # ===================================================================
49 # ===================================================================
51 # conf for retry_before_failing() decorator
53 # bytes tolerance for OS memory related tests
54 TOLERANCE
= 500 * 1024 # 500KB
56 PYTHON
= os
.path
.realpath(sys
.executable
)
57 DEVNULL
= open(os
.devnull
, 'r+')
58 TESTFN
= os
.path
.join(os
.getcwd(), "$testfile")
59 EXAMPLES_DIR
= os
.path
.abspath(os
.path
.join(os
.path
.dirname(
60 os
.path
.dirname(__file__
)), 'examples'))
61 POSIX
= os
.name
== 'posix'
62 LINUX
= sys
.platform
.startswith("linux")
63 WINDOWS
= sys
.platform
.startswith("win32")
64 OSX
= sys
.platform
.startswith("darwin")
65 BSD
= sys
.platform
.startswith("freebsd")
66 SUNOS
= sys
.platform
.startswith("sunos")
69 # ===================================================================
70 # --- Utility functions
71 # ===================================================================
73 _subprocesses_started
= set()
75 def get_test_subprocess(cmd
=None, stdout
=DEVNULL
, stderr
=DEVNULL
, stdin
=DEVNULL
,
77 """Return a subprocess.Popen object to use in tests.
78 By default stdout and stderr are redirected to /dev/null and the
79 python interpreter is used as test process.
80 If 'wait' is True attemps to make sure the process is in a
81 reasonably initialized state.
86 pyline
+= "open(r'%s', 'w'); " % TESTFN
87 pyline
+= "import time; time.sleep(2);"
88 cmd_
= [PYTHON
, "-c", pyline
]
91 sproc
= subprocess
.Popen(cmd_
, stdout
=stdout
, stderr
=stderr
, stdin
=stdin
)
94 stop_at
= time
.time() + 3
95 while stop_at
> time
.time():
96 if os
.path
.exists(TESTFN
):
100 warn("couldn't make sure test file was actually created")
102 wait_for_pid(sproc
.pid
)
103 _subprocesses_started
.add(sproc
.pid
)
107 """Raise a warning msg."""
108 warnings
.warn(msg
, UserWarning)
110 def register_warning(msg
):
111 """Register a warning which will be printed on interpreter exit."""
112 atexit
.register(lambda: warn(msg
))
114 def sh(cmdline
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
):
115 """run cmd in a subprocess and return its output.
116 raises RuntimeError on error.
118 p
= subprocess
.Popen(cmdline
, shell
=True, stdout
=stdout
, stderr
=stderr
)
119 stdout
, stderr
= p
.communicate()
120 if p
.returncode
!= 0:
121 raise RuntimeError(stderr
)
125 stdout
= str(stdout
, sys
.stdout
.encoding
)
126 return stdout
.strip()
129 """Same as UNIX which command. Return None on command not found."""
131 return os
.path
.isfile(fpath
) and os
.access(fpath
, os
.X_OK
)
133 fpath
, fname
= os
.path
.split(program
)
138 for path
in os
.environ
["PATH"].split(os
.pathsep
):
139 exe_file
= os
.path
.join(path
, program
)
144 def wait_for_pid(pid
, timeout
=1):
145 """Wait for pid to show up in the process list then return.
146 Used in the test suite to give time the sub process to initialize.
148 raise_at
= time
.time() + timeout
150 if pid
in psutil
.get_pid_list():
151 # give it one more iteration to allow full initialization
155 if time
.time() >= raise_at
:
156 raise RuntimeError("Timed out")
158 def reap_children(search_all
=False):
159 """Kill any subprocess started by this test suite and ensure that
160 no zombies stick around to hog resources and create problems when
161 looking for refleaks.
163 pids
= _subprocesses_started
165 this_process
= psutil
.Process(os
.getpid())
166 for p
in this_process
.get_children(recursive
=True):
171 child
= psutil
.Process(pid
)
173 except psutil
.NoSuchProcess
:
175 except psutil
.AccessDenied
:
176 warn("couldn't kill child process with pid %s" % pid
)
178 child
.wait(timeout
=3)
180 def check_ip_address(addr
, family
):
181 """Attempts to check IP address's validity."""
185 assert isinstance(port
, int), port
186 if family
== socket
.AF_INET
:
187 ip
= list(map(int, ip
.split('.')))
188 assert len(ip
) == 4, ip
190 assert 0 <= num
<= 255, ip
191 assert 0 <= port
<= 65535, port
193 def safe_remove(fname
):
194 """Deletes a file and does not exception if it doesn't exist."""
198 err
= sys
.exc_info()[1]
199 if err
.args
[0] != errno
.ENOENT
:
202 def call_until(fun
, expr
, timeout
=1):
203 """Keep calling function for timeout secs and exit if eval()
206 stop_at
= time
.time() + timeout
207 while time
.time() < stop_at
:
212 raise RuntimeError('timed out (ret=%r)' % ret
)
214 def retry_before_failing(ntimes
=None):
215 """Decorator which runs a test function and retries N times before
220 def wrapper(*args
, **kwargs
):
221 for x
in range(ntimes
or NO_RETRIES
):
223 return fun(*args
, **kwargs
)
224 except AssertionError:
225 err
= sys
.exc_info()[1]
230 def skip_on_access_denied(only_if
=None):
231 """Decorator to Ignore AccessDenied exceptions."""
234 def wrapper(*args
, **kwargs
):
236 return fun(*args
, **kwargs
)
237 except psutil
.AccessDenied
:
238 if only_if
is not None:
241 msg
= "%r was skipped because it raised AccessDenied" \
244 if hasattr(self
, 'skip'): # python >= 2.7
247 register_warning(msg
)
251 def skip_on_not_implemented(only_if
=None):
252 """Decorator to Ignore NotImplementedError exceptions."""
255 def wrapper(*args
, **kwargs
):
257 return fun(*args
, **kwargs
)
258 except NotImplementedError:
259 if only_if
is not None:
262 msg
= "%r was skipped because it raised NotImplementedError" \
265 if hasattr(self
, 'skip'): # python >= 2.7
268 register_warning(msg
)
273 """Return True if IPv6 is supported on this platform."""
274 if not socket
.has_ipv6
or not hasattr(socket
, "AF_INET6"):
279 sock
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
280 sock
.bind(("::1", 0))
281 except (socket
.error
, socket
.gaierror
):
290 class ThreadTask(threading
.Thread
):
291 """A thread object used for running process thread tests."""
294 threading
.Thread
.__init
__(self
)
295 self
._running
= False
296 self
._interval
= None
297 self
._flag
= threading
.Event()
300 name
= self
.__class
__.__name
__
301 return '<%s running=%s at %#x>' % (name
, self
._running
, id(self
))
303 def start(self
, interval
=0.001):
304 """Start thread and keep it running until an explicit
305 stop() request. Polls for shutdown every 'timeout' seconds.
308 raise ValueError("already started")
309 self
._interval
= interval
310 threading
.Thread
.start(self
)
317 time
.sleep(self
._interval
)
320 """Stop thread execution and and waits until it is stopped."""
321 if not self
._running
:
322 raise ValueError("already stopped")
323 self
._running
= False
327 # ===================================================================
328 # --- Support for python < 2.7 in case unittest2 is not installed
329 # ===================================================================
331 if not hasattr(unittest
, 'skip'):
332 register_warning("unittest2 module is not installed; a serie of pretty " \
333 "darn ugly workarounds will be used")
335 class SkipTest(Exception):
338 class TestCase(unittest
.TestCase
):
340 def _safe_repr(self
, obj
):
345 result
= object.__repr
__(obj
)
346 if len(result
) < MAX_LENGTH
:
348 return result
[:MAX_LENGTH
] + ' [truncated]...'
350 def _fail_w_msg(self
, a
, b
, middle
, msg
):
351 self
.fail(msg
or '%s %s %s' % (self
._safe
_repr
(a
), middle
,
357 def assertIn(self
, a
, b
, msg
=None):
359 self
._fail
_w
_msg
(a
, b
, 'not found in', msg
)
361 def assertNotIn(self
, a
, b
, msg
=None):
363 self
._fail
_w
_msg
(a
, b
, 'found in', msg
)
365 def assertGreater(self
, a
, b
, msg
=None):
367 self
._fail
_w
_msg
(a
, b
, 'not greater than', msg
)
369 def assertGreaterEqual(self
, a
, b
, msg
=None):
371 self
._fail
_w
_msg
(a
, b
, 'not greater than or equal to', msg
)
373 def assertLess(self
, a
, b
, msg
=None):
375 self
._fail
_w
_msg
(a
, b
, 'not less than', msg
)
377 def assertLessEqual(self
, a
, b
, msg
=None):
379 self
._fail
_w
_msg
(a
, b
, 'not less or equal to', msg
)
381 def assertIsInstance(self
, a
, b
, msg
=None):
382 if not isinstance(a
, b
):
383 self
.fail(msg
or '%s is not an instance of %r' \
384 % (self
._safe
_repr
(a
), b
))
386 def assertAlmostEqual(self
, a
, b
, msg
=None, delta
=None):
387 if delta
is not None:
388 if abs(a
- b
) <= delta
:
390 self
.fail(msg
or '%s != %s within %s delta' \
391 % (self
._safe
_repr
(a
), self
._safe
_repr
(b
),
392 self
._safe
_repr
(delta
)))
394 self
.assertEqual(a
, b
, msg
=msg
)
397 def skipIf(condition
, reason
):
400 def wrapper(*args
, **kwargs
):
403 sys
.stdout
.write("skipped-")
406 objname
= "%s.%s" % (self
.__class
__.__name
__,
408 msg
= "%s was skipped" % objname
410 msg
+= "; reason: " + repr(reason
)
411 register_warning(msg
)
414 return fun(*args
, **kwargs
)
418 def skipUnless(condition
, reason
):
420 return unittest
.skipIf(True, reason
)
421 return unittest
.skipIf(False, reason
)
423 unittest
.TestCase
= TestCase
424 unittest
.skipIf
= skipIf
425 unittest
.skipUnless
= skipUnless
426 del TestCase
, skipIf
, skipUnless
429 # ===================================================================
430 # --- System-related API tests
431 # ===================================================================
433 class TestSystemAPIs(unittest
.TestCase
):
434 """Tests for system-related APIs."""
442 def test_process_iter(self
):
443 self
.assertIn(os
.getpid(), [x
.pid
for x
in psutil
.process_iter()])
444 sproc
= get_test_subprocess()
445 self
.assertIn(sproc
.pid
, [x
.pid
for x
in psutil
.process_iter()])
446 p
= psutil
.Process(sproc
.pid
)
449 self
.assertNotIn(sproc
.pid
, [x
.pid
for x
in psutil
.process_iter()])
451 def test_TOTAL_PHYMEM(self
):
452 x
= psutil
.TOTAL_PHYMEM
453 self
.assertIsInstance(x
, (int, long))
454 self
.assertGreater(x
, 0)
455 self
.assertEqual(x
, psutil
.virtual_memory().total
)
457 def test_BOOT_TIME(self
, arg
=None):
458 x
= arg
or psutil
.BOOT_TIME
459 self
.assertIsInstance(x
, float)
460 self
.assertGreater(x
, 0)
461 self
.assertLess(x
, time
.time())
463 def test_get_boot_time(self
):
464 self
.test_BOOT_TIME(psutil
.get_boot_time())
466 # work around float precision issues; give it 1 secs tolerance
467 diff
= abs(psutil
.get_boot_time() - psutil
.BOOT_TIME
)
468 self
.assertLess(diff
, 1)
470 self
.assertEqual(psutil
.get_boot_time(), psutil
.BOOT_TIME
)
472 def test_NUM_CPUS(self
):
473 self
.assertEqual(psutil
.NUM_CPUS
, len(psutil
.cpu_times(percpu
=True)))
474 self
.assertGreaterEqual(psutil
.NUM_CPUS
, 1)
476 @unittest.skipUnless(POSIX
, 'posix only')
477 def test_PAGESIZE(self
):
478 # pagesize is used internally to perform different calculations
479 # and it's determined by using SC_PAGE_SIZE; make sure
480 # getpagesize() returns the same value.
482 self
.assertEqual(os
.sysconf("SC_PAGE_SIZE"), resource
.getpagesize())
484 def test_deprecated_apis(self
):
486 s
.bind(('localhost', 0))
488 warnings
.filterwarnings("error")
489 p
= psutil
.Process(os
.getpid())
491 self
.assertRaises(DeprecationWarning, psutil
.virtmem_usage
)
492 self
.assertRaises(DeprecationWarning, psutil
.used_phymem
)
493 self
.assertRaises(DeprecationWarning, psutil
.avail_phymem
)
494 self
.assertRaises(DeprecationWarning, psutil
.total_virtmem
)
495 self
.assertRaises(DeprecationWarning, psutil
.used_virtmem
)
496 self
.assertRaises(DeprecationWarning, psutil
.avail_virtmem
)
497 self
.assertRaises(DeprecationWarning, psutil
.phymem_usage
)
498 self
.assertRaises(DeprecationWarning, psutil
.get_process_list
)
499 self
.assertRaises(DeprecationWarning, psutil
.network_io_counters
)
501 self
.assertRaises(DeprecationWarning, psutil
.phymem_buffers
)
502 self
.assertRaises(DeprecationWarning, psutil
.cached_phymem
)
505 except DeprecationWarning:
508 self
.fail("p.nice didn't raise DeprecationWarning")
509 ret
= call_until(p
.get_connections
, "len(ret) != 0", timeout
=1)
510 self
.assertRaises(DeprecationWarning,
511 getattr, ret
[0], 'local_address')
512 self
.assertRaises(DeprecationWarning,
513 getattr, ret
[0], 'remote_address')
516 warnings
.resetwarnings()
518 def test_deprecated_apis_retval(self
):
519 warnings
.filterwarnings("ignore")
520 p
= psutil
.Process(os
.getpid())
522 self
.assertEqual(psutil
.total_virtmem(), psutil
.swap_memory().total
)
523 self
.assertEqual(p
.nice
, p
.get_nice())
525 warnings
.resetwarnings()
527 def test_virtual_memory(self
):
528 mem
= psutil
.virtual_memory()
529 assert mem
.total
> 0, mem
530 assert mem
.available
> 0, mem
531 assert 0 <= mem
.percent
<= 100, mem
532 assert mem
.used
> 0, mem
533 assert mem
.free
>= 0, mem
534 for name
in mem
._fields
:
536 value
= getattr(mem
, name
)
538 self
.fail("%r < 0 (%s)" % (name
, value
))
539 if value
> mem
.total
:
540 self
.fail("%r > total (total=%s, %s=%s)" \
541 % (name
, mem
.total
, name
, value
))
543 def test_swap_memory(self
):
544 mem
= psutil
.swap_memory()
545 assert mem
.total
>= 0, mem
546 assert mem
.used
>= 0, mem
547 assert mem
.free
> 0, mem
548 assert 0 <= mem
.percent
<= 100, mem
549 assert mem
.sin
>= 0, mem
550 assert mem
.sout
>= 0, mem
552 def test_pid_exists(self
):
553 sproc
= get_test_subprocess(wait
=True)
554 assert psutil
.pid_exists(sproc
.pid
)
555 p
= psutil
.Process(sproc
.pid
)
558 self
.assertFalse(psutil
.pid_exists(sproc
.pid
))
559 self
.assertFalse(psutil
.pid_exists(-1))
561 def test_pid_exists_2(self
):
563 pids
= psutil
.get_pid_list()
566 assert psutil
.pid_exists(pid
)
567 except AssertionError:
568 # in case the process disappeared in meantime fail only
569 # if it is no longer in get_pid_list()
571 if pid
in psutil
.get_pid_list():
573 pids
= range(max(pids
) + 5000, max(pids
) + 6000)
575 self
.assertFalse(psutil
.pid_exists(pid
))
577 def test_get_pid_list(self
):
578 plist
= [x
.pid
for x
in psutil
.process_iter()]
579 pidlist
= psutil
.get_pid_list()
580 self
.assertEqual(plist
.sort(), pidlist
.sort())
581 # make sure every pid is unique
582 self
.assertEqual(len(pidlist
), len(set(pidlist
)))
585 # test for psutil.test() function
593 def test_sys_cpu_times(self
):
595 times
= psutil
.cpu_times()
597 for cp_time
in times
:
598 self
.assertIsInstance(cp_time
, float)
599 self
.assertGreaterEqual(cp_time
, 0.0)
601 self
.assertEqual(total
, sum(times
))
604 def test_sys_cpu_times2(self
):
605 t1
= sum(psutil
.cpu_times())
607 t2
= sum(psutil
.cpu_times())
609 if not difference
>= 0.05:
610 self
.fail("difference %s" % difference
)
612 def test_sys_per_cpu_times(self
):
613 for times
in psutil
.cpu_times(percpu
=True):
616 for cp_time
in times
:
617 self
.assertIsInstance(cp_time
, float)
618 self
.assertGreaterEqual(cp_time
, 0.0)
620 self
.assertEqual(total
, sum(times
))
622 self
.assertEqual(len(psutil
.cpu_times(percpu
=True)[0]),
623 len(psutil
.cpu_times(percpu
=False)))
625 def test_sys_per_cpu_times2(self
):
626 tot1
= psutil
.cpu_times(percpu
=True)
627 stop_at
= time
.time() + 0.1
629 if time
.time() >= stop_at
:
631 tot2
= psutil
.cpu_times(percpu
=True)
632 for t1
, t2
in zip(tot1
, tot2
):
633 t1
, t2
= sum(t1
), sum(t2
)
635 if difference
>= 0.05:
639 def _test_cpu_percent(self
, percent
):
640 self
.assertIsInstance(percent
, float)
641 self
.assertGreaterEqual(percent
, 0.0)
642 self
.assertLessEqual(percent
, 100.0)
644 def test_sys_cpu_percent(self
):
645 psutil
.cpu_percent(interval
=0.001)
646 for x
in range(1000):
647 self
._test
_cpu
_percent
(psutil
.cpu_percent(interval
=None))
649 def test_sys_per_cpu_percent(self
):
650 self
.assertEqual(len(psutil
.cpu_percent(interval
=0.001, percpu
=True)),
652 for x
in range(1000):
653 percents
= psutil
.cpu_percent(interval
=None, percpu
=True)
654 for percent
in percents
:
655 self
._test
_cpu
_percent
(percent
)
657 def test_sys_cpu_times_percent(self
):
658 psutil
.cpu_times_percent(interval
=0.001)
659 for x
in range(1000):
660 cpu
= psutil
.cpu_times_percent(interval
=None)
662 self
._test
_cpu
_percent
(percent
)
663 self
._test
_cpu
_percent
(sum(cpu
))
665 def test_sys_per_cpu_times_percent(self
):
666 self
.assertEqual(len(psutil
.cpu_times_percent(interval
=0.001,
669 for x
in range(1000):
670 cpus
= psutil
.cpu_times_percent(interval
=None, percpu
=True)
673 self
._test
_cpu
_percent
(percent
)
674 self
._test
_cpu
_percent
(sum(cpu
))
676 @unittest.skipIf(POSIX
and not hasattr(os
, 'statvfs'),
677 "os.statvfs() function not available on this platform")
678 def test_disk_usage(self
):
679 usage
= psutil
.disk_usage(os
.getcwd())
680 assert usage
.total
> 0, usage
681 assert usage
.used
> 0, usage
682 assert usage
.free
> 0, usage
683 assert usage
.total
> usage
.used
, usage
684 assert usage
.total
> usage
.free
, usage
685 assert 0 <= usage
.percent
<= 100, usage
.percent
687 # if path does not exist OSError ENOENT is expected across
689 fname
= tempfile
.mktemp()
691 psutil
.disk_usage(fname
)
693 err
= sys
.exc_info()[1]
694 if err
.args
[0] != errno
.ENOENT
:
697 self
.fail("OSError not raised")
699 @unittest.skipIf(POSIX
and not hasattr(os
, 'statvfs'),
700 "os.statvfs() function not available on this platform")
701 def test_disk_partitions(self
):
703 for disk
in psutil
.disk_partitions(all
=False):
704 if WINDOWS
and 'cdrom' in disk
.opts
:
707 assert os
.path
.exists(disk
.device
), disk
709 # we cannot make any assumption about this, see:
710 # http://goo.gl/p9c43
713 # on solaris apparently mount points can also be files
714 assert os
.path
.exists(disk
.mountpoint
), disk
716 assert os
.path
.isdir(disk
.mountpoint
), disk
717 assert disk
.fstype
, disk
718 self
.assertIsInstance(disk
.opts
, str)
721 for disk
in psutil
.disk_partitions(all
=True):
724 os
.stat(disk
.mountpoint
)
726 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
727 err
= sys
.exc_info()[1]
728 if err
.errno
not in (errno
.EPERM
, errno
.EACCES
):
732 # on solaris apparently mount points can also be files
733 assert os
.path
.exists(disk
.mountpoint
), disk
735 assert os
.path
.isdir(disk
.mountpoint
), disk
736 self
.assertIsInstance(disk
.fstype
, str)
737 self
.assertIsInstance(disk
.opts
, str)
739 def find_mount_point(path
):
740 path
= os
.path
.abspath(path
)
741 while not os
.path
.ismount(path
):
742 path
= os
.path
.dirname(path
)
745 mount
= find_mount_point(__file__
)
746 mounts
= [x
.mountpoint
for x
in psutil
.disk_partitions(all
=True)]
747 self
.assertIn(mount
, mounts
)
748 psutil
.disk_usage(mount
)
750 def test_net_io_counters(self
):
751 def check_ntuple(nt
):
752 self
.assertEqual(nt
[0], nt
.bytes_sent
)
753 self
.assertEqual(nt
[1], nt
.bytes_recv
)
754 self
.assertEqual(nt
[2], nt
.packets_sent
)
755 self
.assertEqual(nt
[3], nt
.packets_recv
)
756 self
.assertEqual(nt
[4], nt
.errin
)
757 self
.assertEqual(nt
[5], nt
.errout
)
758 self
.assertEqual(nt
[6], nt
.dropin
)
759 self
.assertEqual(nt
[7], nt
.dropout
)
760 assert nt
.bytes_sent
>= 0, nt
761 assert nt
.bytes_recv
>= 0, nt
762 assert nt
.packets_sent
>= 0, nt
763 assert nt
.packets_recv
>= 0, nt
764 assert nt
.errin
>= 0, nt
765 assert nt
.errout
>= 0, nt
766 assert nt
.dropin
>= 0, nt
767 assert nt
.dropout
>= 0, nt
769 ret
= psutil
.net_io_counters(pernic
=False)
771 ret
= psutil
.net_io_counters(pernic
=True)
775 check_ntuple(ret
[key
])
777 def test_disk_io_counters(self
):
778 def check_ntuple(nt
):
779 self
.assertEqual(nt
[0], nt
.read_count
)
780 self
.assertEqual(nt
[1], nt
.write_count
)
781 self
.assertEqual(nt
[2], nt
.read_bytes
)
782 self
.assertEqual(nt
[3], nt
.write_bytes
)
783 self
.assertEqual(nt
[4], nt
.read_time
)
784 self
.assertEqual(nt
[5], nt
.write_time
)
785 assert nt
.read_count
>= 0, nt
786 assert nt
.write_count
>= 0, nt
787 assert nt
.read_bytes
>= 0, nt
788 assert nt
.write_bytes
>= 0, nt
789 assert nt
.read_time
>= 0, nt
790 assert nt
.write_time
>= 0, nt
792 ret
= psutil
.disk_io_counters(perdisk
=False)
794 ret
= psutil
.disk_io_counters(perdisk
=True)
795 # make sure there are no duplicates
796 self
.assertEqual(len(ret
), len(set(ret
)))
799 check_ntuple(ret
[key
])
800 if LINUX
and key
[-1].isdigit():
801 # if 'sda1' is listed 'sda' shouldn't, see:
802 # http://code.google.com/p/psutil/issues/detail?id=338
803 while key
[-1].isdigit():
805 self
.assertNotIn(key
, ret
.keys())
807 def test_get_users(self
):
808 users
= psutil
.get_users()
811 assert user
.name
, user
814 assert user
.started
> 0.0, user
815 datetime
.datetime
.fromtimestamp(user
.started
)
818 # ===================================================================
819 # --- psutil.Process class tests
820 # ===================================================================
822 class TestProcess(unittest
.TestCase
):
823 """Tests for psutil.Process class."""
832 sproc
= get_test_subprocess(wait
=True)
834 p
= psutil
.Process(test_pid
)
838 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
840 def test_terminate(self
):
841 sproc
= get_test_subprocess(wait
=True)
843 p
= psutil
.Process(test_pid
)
847 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
849 def test_send_signal(self
):
854 sproc
= get_test_subprocess()
856 p
= psutil
.Process(test_pid
)
860 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
863 # check exit code signal
864 sproc
= get_test_subprocess()
865 p
= psutil
.Process(sproc
.pid
)
868 if os
.name
== 'posix':
869 self
.assertEqual(code
, signal
.SIGKILL
)
871 self
.assertEqual(code
, 0)
872 self
.assertFalse(p
.is_running())
874 sproc
= get_test_subprocess()
875 p
= psutil
.Process(sproc
.pid
)
878 if os
.name
== 'posix':
879 self
.assertEqual(code
, signal
.SIGTERM
)
881 self
.assertEqual(code
, 0)
882 self
.assertFalse(p
.is_running())
884 # check sys.exit() code
885 code
= "import time, sys; time.sleep(0.01); sys.exit(5);"
886 sproc
= get_test_subprocess([PYTHON
, "-c", code
])
887 p
= psutil
.Process(sproc
.pid
)
888 self
.assertEqual(p
.wait(), 5)
889 self
.assertFalse(p
.is_running())
891 # Test wait() issued twice.
892 # It is not supposed to raise NSP when the process is gone.
893 # On UNIX this should return None, on Windows it should keep
894 # returning the exit code.
895 sproc
= get_test_subprocess([PYTHON
, "-c", code
])
896 p
= psutil
.Process(sproc
.pid
)
897 self
.assertEqual(p
.wait(), 5)
898 self
.assertIn(p
.wait(), (5, None))
901 sproc
= get_test_subprocess()
902 p
= psutil
.Process(sproc
.pid
)
904 self
.assertRaises(psutil
.TimeoutExpired
, p
.wait
, 0.01)
906 # timeout < 0 not allowed
907 self
.assertRaises(ValueError, p
.wait
, -1)
909 @unittest.skipUnless(POSIX
, '') # XXX why is this skipped on Windows?
910 def test_wait_non_children(self
):
911 # test wait() against processes which are not our children
913 code
+= "from subprocess import Popen, PIPE;"
914 code
+= "cmd = ['%s', '-c', 'import time; time.sleep(2)'];" %PYTHON
915 code
+= "sp = Popen(cmd, stdout=PIPE);"
916 code
+= "sys.stdout.write(str(sp.pid));"
917 sproc
= get_test_subprocess([PYTHON
, "-c", code
], stdout
=subprocess
.PIPE
)
919 grandson_pid
= int(sproc
.stdout
.read())
920 grandson_proc
= psutil
.Process(grandson_pid
)
922 self
.assertRaises(psutil
.TimeoutExpired
, grandson_proc
.wait
, 0.01)
924 ret
= grandson_proc
.wait()
925 self
.assertEqual(ret
, None)
927 if grandson_proc
.is_running():
931 def test_wait_timeout_0(self
):
932 sproc
= get_test_subprocess()
933 p
= psutil
.Process(sproc
.pid
)
934 self
.assertRaises(psutil
.TimeoutExpired
, p
.wait
, 0)
936 stop_at
= time
.time() + 2
940 except psutil
.TimeoutExpired
:
941 if time
.time() >= stop_at
:
945 if os
.name
== 'posix':
946 self
.assertEqual(code
, signal
.SIGKILL
)
948 self
.assertEqual(code
, 0)
949 self
.assertFalse(p
.is_running())
951 def test_cpu_percent(self
):
952 p
= psutil
.Process(os
.getpid())
953 p
.get_cpu_percent(interval
=0.001)
954 p
.get_cpu_percent(interval
=0.001)
956 percent
= p
.get_cpu_percent(interval
=None)
957 self
.assertIsInstance(percent
, float)
958 self
.assertGreaterEqual(percent
, 0.0)
959 if os
.name
!= 'posix':
960 self
.assertLessEqual(percent
, 100.0)
962 self
.assertGreaterEqual(percent
, 0.0)
964 def test_cpu_times(self
):
965 times
= psutil
.Process(os
.getpid()).get_cpu_times()
966 assert (times
.user
> 0.0) or (times
.system
> 0.0), times
967 # make sure returned values can be pretty printed with strftime
968 time
.strftime("%H:%M:%S", time
.localtime(times
.user
))
969 time
.strftime("%H:%M:%S", time
.localtime(times
.system
))
971 # Test Process.cpu_times() against os.times()
972 # os.times() is broken on Python 2.6
973 # http://bugs.python.org/issue1040026
974 # XXX fails on OSX: not sure if it's for os.times(). We should
975 # try this with Python 2.7 and re-enable the test.
977 @unittest.skipUnless(sys
.version_info
> (2, 6, 1) and not OSX
,
978 'os.times() is not reliable on this Python version')
979 def test_cpu_times2(self
):
980 user_time
, kernel_time
= psutil
.Process(os
.getpid()).get_cpu_times()
981 utime
, ktime
= os
.times()[:2]
983 # Use os.times()[:2] as base values to compare our results
984 # using a tolerance of +/- 0.1 seconds.
985 # It will fail if the difference between the values is > 0.1s.
986 if (max([user_time
, utime
]) - min([user_time
, utime
])) > 0.1:
987 self
.fail("expected: %s, found: %s" %(utime
, user_time
))
989 if (max([kernel_time
, ktime
]) - min([kernel_time
, ktime
])) > 0.1:
990 self
.fail("expected: %s, found: %s" %(ktime
, kernel_time
))
992 def test_create_time(self
):
993 sproc
= get_test_subprocess(wait
=True)
995 p
= psutil
.Process(sproc
.pid
)
996 create_time
= p
.create_time
998 # Use time.time() as base value to compare our result using a
999 # tolerance of +/- 1 second.
1000 # It will fail if the difference between the values is > 2s.
1001 difference
= abs(create_time
- now
)
1003 self
.fail("expected: %s, found: %s, difference: %s"
1004 % (now
, create_time
, difference
))
1006 # make sure returned value can be pretty printed with strftime
1007 time
.strftime("%Y %m %d %H:%M:%S", time
.localtime(p
.create_time
))
1009 @unittest.skipIf(WINDOWS
, 'windows only')
1010 def test_terminal(self
):
1011 terminal
= psutil
.Process(os
.getpid()).terminal
1012 if sys
.stdin
.isatty():
1013 self
.assertEqual(terminal
, sh('tty'))
1015 assert terminal
, repr(terminal
)
1017 @unittest.skipIf(not hasattr(psutil
.Process
, 'get_io_counters'),
1018 'not available on this platform')
1019 @skip_on_not_implemented(only_if
=LINUX
)
1020 def test_get_io_counters(self
):
1021 p
= psutil
.Process(os
.getpid())
1023 io1
= p
.get_io_counters()
1024 f
= open(PYTHON
, 'rb')
1027 io2
= p
.get_io_counters()
1029 assert io2
.read_count
> io1
.read_count
, (io1
, io2
)
1030 self
.assertEqual(io2
.write_count
, io1
.write_count
)
1031 assert io2
.read_bytes
>= io1
.read_bytes
, (io1
, io2
)
1032 assert io2
.write_bytes
>= io1
.write_bytes
, (io1
, io2
)
1034 io1
= p
.get_io_counters()
1035 f
= tempfile
.TemporaryFile()
1037 f
.write(bytes("x" * 1000000, 'ascii'))
1039 f
.write("x" * 1000000)
1041 io2
= p
.get_io_counters()
1042 assert io2
.write_count
>= io1
.write_count
, (io1
, io2
)
1043 assert io2
.write_bytes
>= io1
.write_bytes
, (io1
, io2
)
1044 assert io2
.read_count
>= io1
.read_count
, (io1
, io2
)
1045 assert io2
.read_bytes
>= io1
.read_bytes
, (io1
, io2
)
1047 # Linux and Windows Vista+
1048 @unittest.skipUnless(hasattr(psutil
.Process
, 'get_ionice'),
1049 'Linux and Windows Vista only')
1050 def test_get_set_ionice(self
):
1052 from psutil
import (IOPRIO_CLASS_NONE
, IOPRIO_CLASS_RT
,
1053 IOPRIO_CLASS_BE
, IOPRIO_CLASS_IDLE
)
1054 self
.assertEqual(IOPRIO_CLASS_NONE
, 0)
1055 self
.assertEqual(IOPRIO_CLASS_RT
, 1)
1056 self
.assertEqual(IOPRIO_CLASS_BE
, 2)
1057 self
.assertEqual(IOPRIO_CLASS_IDLE
, 3)
1058 p
= psutil
.Process(os
.getpid())
1061 ioclass
, value
= p
.get_ionice()
1062 self
.assertEqual(ioclass
, 2)
1063 self
.assertEqual(value
, 4)
1066 ioclass
, value
= p
.get_ionice()
1067 self
.assertEqual(ioclass
, 3)
1068 self
.assertEqual(value
, 0)
1071 ioclass
, value
= p
.get_ionice()
1072 self
.assertEqual(ioclass
, 2)
1073 self
.assertEqual(value
, 0)
1075 ioclass
, value
= p
.get_ionice()
1076 self
.assertEqual(ioclass
, 2)
1077 self
.assertEqual(value
, 7)
1078 self
.assertRaises(ValueError, p
.set_ionice
, 2, 10)
1080 p
.set_ionice(IOPRIO_CLASS_NONE
)
1082 p
= psutil
.Process(os
.getpid())
1083 original
= p
.get_ionice()
1085 value
= 0 # very low
1086 if original
== value
:
1089 self
.assertEqual(p
.get_ionice(), value
)
1091 p
.set_ionice(original
)
1093 self
.assertRaises(ValueError, p
.set_ionice
, 3)
1094 self
.assertRaises(TypeError, p
.set_ionice
, 2, 1)
1096 def test_get_num_threads(self
):
1097 # on certain platforms such as Linux we might test for exact
1098 # thread number, since we always have with 1 thread per process,
1099 # but this does not apply across all platforms (OSX, Windows)
1100 p
= psutil
.Process(os
.getpid())
1101 step1
= p
.get_num_threads()
1103 thread
= ThreadTask()
1106 step2
= p
.get_num_threads()
1107 self
.assertEqual(step2
, step1
+ 1)
1113 @unittest.skipUnless(WINDOWS
, 'Windows only')
1114 def test_get_num_handles(self
):
1115 # a better test is done later into test/_windows.py
1116 p
= psutil
.Process(os
.getpid())
1117 self
.assertGreater(p
.get_num_handles(), 0)
1119 def test_get_threads(self
):
1120 p
= psutil
.Process(os
.getpid())
1121 step1
= p
.get_threads()
1123 thread
= ThreadTask()
1127 step2
= p
.get_threads()
1128 self
.assertEqual(len(step2
), len(step1
) + 1)
1129 # on Linux, first thread id is supposed to be this process
1131 self
.assertEqual(step2
[0].id, os
.getpid())
1134 self
.assertEqual(athread
.id, athread
[0])
1135 self
.assertEqual(athread
.user_time
, athread
[1])
1136 self
.assertEqual(athread
.system_time
, athread
[2])
1143 def test_get_memory_info(self
):
1144 p
= psutil
.Process(os
.getpid())
1146 # step 1 - get a base value to compare our results
1147 rss1
, vms1
= p
.get_memory_info()
1148 percent1
= p
.get_memory_percent()
1149 self
.assertGreater(rss1
, 0)
1150 self
.assertGreater(vms1
, 0)
1152 # step 2 - allocate some memory
1153 memarr
= [None] * 1500000
1155 rss2
, vms2
= p
.get_memory_info()
1156 percent2
= p
.get_memory_percent()
1157 # make sure that the memory usage bumped up
1158 self
.assertGreater(rss2
, rss1
)
1159 self
.assertGreaterEqual(vms2
, vms1
) # vms might be equal
1160 self
.assertGreater(percent2
, percent1
)
1163 # def test_get_ext_memory_info(self):
1164 # # tested later in fetch all test suite
1166 def test_get_memory_maps(self
):
1167 p
= psutil
.Process(os
.getpid())
1168 maps
= p
.get_memory_maps()
1169 paths
= [x
for x
in maps
]
1170 self
.assertEqual(len(paths
), len(set(paths
)))
1171 ext_maps
= p
.get_memory_maps(grouped
=False)
1174 if not nt
.path
.startswith('['):
1175 assert os
.path
.isabs(nt
.path
), nt
.path
1177 assert os
.path
.exists(nt
.path
), nt
.path
1179 # XXX - On Windows we have this strange behavior with
1180 # 64 bit dlls: they are visible via explorer but cannot
1181 # be accessed via os.stat() (wtf?).
1182 if '64' not in os
.path
.basename(nt
.path
):
1183 assert os
.path
.exists(nt
.path
), nt
.path
1185 for fname
in nt
._fields
:
1186 value
= getattr(nt
, fname
)
1189 elif fname
in ('addr', 'perms'):
1192 self
.assertIsInstance(value
, (int, long))
1193 assert value
>= 0, value
1195 def test_get_memory_percent(self
):
1196 p
= psutil
.Process(os
.getpid())
1197 self
.assertGreater(p
.get_memory_percent(), 0.0)
1200 sproc
= get_test_subprocess()
1201 self
.assertEqual(psutil
.Process(sproc
.pid
).pid
, sproc
.pid
)
1203 def test_is_running(self
):
1204 sproc
= get_test_subprocess(wait
=True)
1205 p
= psutil
.Process(sproc
.pid
)
1206 assert p
.is_running()
1207 assert p
.is_running()
1210 assert not p
.is_running()
1211 assert not p
.is_running()
1214 sproc
= get_test_subprocess(wait
=True)
1215 exe
= psutil
.Process(sproc
.pid
).exe
1217 self
.assertEqual(exe
, PYTHON
)
1218 except AssertionError:
1219 if WINDOWS
and len(exe
) == len(PYTHON
):
1220 # on Windows we don't care about case sensitivity
1221 self
.assertEqual(exe
.lower(), PYTHON
.lower())
1223 # certain platforms such as BSD are more accurate returning:
1224 # "/usr/local/bin/python2.7"
1226 # "/usr/local/bin/python"
1227 # We do not want to consider this difference in accuracy
1229 ver
= "%s.%s" % (sys
.version_info
[0], sys
.version_info
[1])
1230 self
.assertEqual(exe
.replace(ver
, ''), PYTHON
.replace(ver
, ''))
1232 def test_cmdline(self
):
1233 cmdline
= [PYTHON
, "-c", "import time; time.sleep(2)"]
1234 sproc
= get_test_subprocess(cmdline
, wait
=True)
1235 self
.assertEqual(' '.join(psutil
.Process(sproc
.pid
).cmdline
),
1238 def test_name(self
):
1239 sproc
= get_test_subprocess(PYTHON
, wait
=True)
1240 name
= psutil
.Process(sproc
.pid
).name
.lower()
1241 pyexe
= os
.path
.basename(os
.path
.realpath(sys
.executable
)).lower()
1242 assert pyexe
.startswith(name
), (pyexe
, name
)
1244 @unittest.skipUnless(POSIX
, 'posix only')
1245 def test_uids(self
):
1246 p
= psutil
.Process(os
.getpid())
1247 real
, effective
, saved
= p
.uids
1248 # os.getuid() refers to "real" uid
1249 self
.assertEqual(real
, os
.getuid())
1250 # os.geteuid() refers to "effective" uid
1251 self
.assertEqual(effective
, os
.geteuid())
1252 # no such thing as os.getsuid() ("saved" uid), but starting
1253 # from python 2.7 we have os.getresuid()[2]
1254 if hasattr(os
, "getresuid"):
1255 self
.assertEqual(saved
, os
.getresuid()[2])
1257 @unittest.skipUnless(POSIX
, 'posix only')
1258 def test_gids(self
):
1259 p
= psutil
.Process(os
.getpid())
1260 real
, effective
, saved
= p
.gids
1261 # os.getuid() refers to "real" uid
1262 self
.assertEqual(real
, os
.getgid())
1263 # os.geteuid() refers to "effective" uid
1264 self
.assertEqual(effective
, os
.getegid())
1265 # no such thing as os.getsuid() ("saved" uid), but starting
1266 # from python 2.7 we have os.getresgid()[2]
1267 if hasattr(os
, "getresuid"):
1268 self
.assertEqual(saved
, os
.getresgid()[2])
1270 def test_nice(self
):
1271 p
= psutil
.Process(os
.getpid())
1272 self
.assertRaises(TypeError, p
.set_nice
, "str")
1275 self
.assertEqual(p
.get_nice(), psutil
.NORMAL_PRIORITY_CLASS
)
1276 p
.set_nice(psutil
.HIGH_PRIORITY_CLASS
)
1277 self
.assertEqual(p
.get_nice(), psutil
.HIGH_PRIORITY_CLASS
)
1278 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1279 self
.assertEqual(p
.get_nice(), psutil
.NORMAL_PRIORITY_CLASS
)
1281 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1285 first_nice
= p
.get_nice()
1287 self
.assertEqual(p
.get_nice(), 1)
1288 # going back to previous nice value raises AccessDenied on OSX
1291 self
.assertEqual(p
.get_nice(), 0)
1292 except psutil
.AccessDenied
:
1296 p
.set_nice(first_nice
)
1297 except psutil
.AccessDenied
:
1300 def test_status(self
):
1301 p
= psutil
.Process(os
.getpid())
1302 self
.assertEqual(p
.status
, psutil
.STATUS_RUNNING
)
1303 self
.assertEqual(str(p
.status
), "running")
1305 def test_status_constants(self
):
1306 # STATUS_* constants are supposed to be comparable also by
1307 # using their str representation
1308 self
.assertTrue(psutil
.STATUS_RUNNING
== 0)
1309 self
.assertTrue(psutil
.STATUS_RUNNING
== long(0))
1310 self
.assertTrue(psutil
.STATUS_RUNNING
== 'running')
1311 self
.assertFalse(psutil
.STATUS_RUNNING
== 1)
1312 self
.assertFalse(psutil
.STATUS_RUNNING
== 'sleeping')
1313 self
.assertFalse(psutil
.STATUS_RUNNING
!= 0)
1314 self
.assertFalse(psutil
.STATUS_RUNNING
!= 'running')
1315 self
.assertTrue(psutil
.STATUS_RUNNING
!= 1)
1316 self
.assertTrue(psutil
.STATUS_RUNNING
!= 'sleeping')
1318 def test_username(self
):
1319 sproc
= get_test_subprocess()
1320 p
= psutil
.Process(sproc
.pid
)
1323 self
.assertEqual(p
.username
, pwd
.getpwuid(os
.getuid()).pw_name
)
1324 elif WINDOWS
and 'USERNAME' in os
.environ
:
1325 expected_username
= os
.environ
['USERNAME']
1326 expected_domain
= os
.environ
['USERDOMAIN']
1327 domain
, username
= p
.username
.split('\\')
1328 self
.assertEqual(domain
, expected_domain
)
1329 self
.assertEqual(username
, expected_username
)
1333 @unittest.skipUnless(hasattr(psutil
.Process
, "getcwd"),
1334 'not available on this platform')
1335 def test_getcwd(self
):
1336 sproc
= get_test_subprocess(wait
=True)
1337 p
= psutil
.Process(sproc
.pid
)
1338 self
.assertEqual(p
.getcwd(), os
.getcwd())
1340 @unittest.skipIf(not hasattr(psutil
.Process
, "getcwd"),
1341 'not available on this platform')
1342 def test_getcwd_2(self
):
1343 cmd
= [PYTHON
, "-c", "import os, time; os.chdir('..'); time.sleep(2)"]
1344 sproc
= get_test_subprocess(cmd
, wait
=True)
1345 p
= psutil
.Process(sproc
.pid
)
1346 call_until(p
.getcwd
, "ret == os.path.dirname(os.getcwd())", timeout
=1)
1348 @unittest.skipIf(not hasattr(psutil
.Process
, "get_cpu_affinity"),
1349 'not available on this platform')
1350 def test_cpu_affinity(self
):
1351 p
= psutil
.Process(os
.getpid())
1352 initial
= p
.get_cpu_affinity()
1353 all_cpus
= list(range(len(psutil
.cpu_percent(percpu
=True))))
1356 p
.set_cpu_affinity([n
])
1357 self
.assertEqual(p
.get_cpu_affinity(), [n
])
1359 p
.set_cpu_affinity(all_cpus
)
1360 self
.assertEqual(p
.get_cpu_affinity(), all_cpus
)
1362 p
.set_cpu_affinity(initial
)
1363 invalid_cpu
= [len(psutil
.cpu_times(percpu
=True)) + 10]
1364 self
.assertRaises(ValueError, p
.set_cpu_affinity
, invalid_cpu
)
1366 def test_get_open_files(self
):
1368 p
= psutil
.Process(os
.getpid())
1369 files
= p
.get_open_files()
1370 self
.assertFalse(TESTFN
in files
)
1371 f
= open(TESTFN
, 'w')
1372 call_until(p
.get_open_files
, "len(ret) != %i" % len(files
))
1373 filenames
= [x
.path
for x
in p
.get_open_files()]
1374 self
.assertIn(TESTFN
, filenames
)
1376 for file in filenames
:
1377 assert os
.path
.isfile(file), file
1380 cmdline
= "import time; f = open(r'%s', 'r'); time.sleep(2);" % TESTFN
1381 sproc
= get_test_subprocess([PYTHON
, "-c", cmdline
], wait
=True)
1382 p
= psutil
.Process(sproc
.pid
)
1383 for x
in range(100):
1384 filenames
= [x
.path
for x
in p
.get_open_files()]
1385 if TESTFN
in filenames
:
1389 self
.assertIn(TESTFN
, filenames
)
1390 for file in filenames
:
1391 assert os
.path
.isfile(file), file
1393 def test_get_open_files2(self
):
1394 # test fd and path fields
1395 fileobj
= open(TESTFN
, 'w')
1396 p
= psutil
.Process(os
.getpid())
1397 for path
, fd
in p
.get_open_files():
1398 if path
== fileobj
.name
or fd
== fileobj
.fileno():
1401 self
.fail("no file found; files=%s" % repr(p
.get_open_files()))
1402 self
.assertEqual(path
, fileobj
.name
)
1404 self
.assertEqual(fd
, -1)
1406 self
.assertEqual(fd
, fileobj
.fileno())
1408 ntuple
= p
.get_open_files()[0]
1409 self
.assertEqual(ntuple
[0], ntuple
.path
)
1410 self
.assertEqual(ntuple
[1], ntuple
.fd
)
1413 self
.assertTrue(fileobj
.name
not in p
.get_open_files())
1415 def test_connection_constants(self
):
1418 for name
in dir(psutil
):
1419 if name
.startswith('CONN_'):
1420 num
= getattr(psutil
, name
)
1422 assert str_
.isupper(), str_
1423 assert str_
not in strs
, str_
1424 assert num
not in ints
, num
1431 psutil
.CONN_DELETE_TCB
1433 def test_get_connections(self
):
1434 arg
= "import socket, time;" \
1435 "s = socket.socket();" \
1436 "s.bind(('127.0.0.1', 0));" \
1438 "conn, addr = s.accept();" \
1440 sproc
= get_test_subprocess([PYTHON
, "-c", arg
])
1441 p
= psutil
.Process(sproc
.pid
)
1442 for x
in range(100):
1443 if p
.get_connections():
1444 # give the subprocess some more time to bind()
1446 cons
= p
.get_connections()
1449 self
.assertEqual(len(cons
), 1)
1451 self
.assertEqual(con
.family
, socket
.AF_INET
)
1452 self
.assertEqual(con
.type, socket
.SOCK_STREAM
)
1453 self
.assertEqual(con
.status
, psutil
.CONN_LISTEN
, str(con
.status
))
1454 ip
, port
= con
.laddr
1455 self
.assertEqual(ip
, '127.0.0.1')
1456 self
.assertEqual(con
.raddr
, ())
1457 if WINDOWS
or SUNOS
:
1458 self
.assertEqual(con
.fd
, -1)
1460 assert con
.fd
> 0, con
1462 self
.assertEqual(con
[0], con
.fd
)
1463 self
.assertEqual(con
[1], con
.family
)
1464 self
.assertEqual(con
[2], con
.type)
1465 self
.assertEqual(con
[3], con
.laddr
)
1466 self
.assertEqual(con
[4], con
.raddr
)
1467 self
.assertEqual(con
[5], con
.status
)
1469 self
.assertRaises(ValueError, p
.get_connections
, 'foo')
1471 @unittest.skipUnless(supports_ipv6(), 'IPv6 is not supported')
1472 def test_get_connections_ipv6(self
):
1473 s
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
1476 cons
= psutil
.Process(os
.getpid()).get_connections()
1478 self
.assertEqual(len(cons
), 1)
1479 self
.assertEqual(cons
[0].laddr
[0], '::1')
1481 @unittest.skipUnless(hasattr(socket
, 'AF_UNIX'), 'AF_UNIX is not supported')
1482 def test_get_connections_unix(self
):
1485 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1487 conn
= psutil
.Process(os
.getpid()).get_connections(kind
='unix')[0]
1488 if conn
.fd
!= -1: # != sunos and windows
1489 self
.assertEqual(conn
.fd
, sock
.fileno())
1490 self
.assertEqual(conn
.family
, socket
.AF_UNIX
)
1491 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1492 self
.assertEqual(conn
.laddr
, TESTFN
)
1493 self
.assertTrue(not conn
.raddr
)
1494 self
.assertEqual(conn
.status
, psutil
.CONN_NONE
, str(conn
.status
))
1498 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_DGRAM
)
1500 conn
= psutil
.Process(os
.getpid()).get_connections(kind
='unix')[0]
1501 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1504 @unittest.skipUnless(hasattr(socket
, "fromfd"),
1505 'socket.fromfd() is not availble')
1506 @unittest.skipIf(WINDOWS
or SUNOS
,
1507 'connection fd available on this platform')
1508 def test_connection_fromfd(self
):
1509 sock
= socket
.socket()
1510 sock
.bind(('localhost', 0))
1512 p
= psutil
.Process(os
.getpid())
1513 for conn
in p
.get_connections():
1514 if conn
.fd
== sock
.fileno():
1518 self
.fail("couldn't find socket fd")
1519 dupsock
= socket
.fromfd(conn
.fd
, conn
.family
, conn
.type)
1521 self
.assertEqual(dupsock
.getsockname(), conn
.laddr
)
1522 self
.assertNotEqual(sock
.fileno(), dupsock
.fileno())
1527 def test_get_connections_all(self
):
1528 tcp_template
= "import socket;" \
1529 "s = socket.socket($family, socket.SOCK_STREAM);" \
1530 "s.bind(('$addr', 0));" \
1532 "conn, addr = s.accept();"
1534 udp_template
= "import socket, time;" \
1535 "s = socket.socket($family, socket.SOCK_DGRAM);" \
1536 "s.bind(('$addr', 0));" \
1539 from string
import Template
1540 tcp4_template
= Template(tcp_template
).substitute(family
=socket
.AF_INET
,
1542 udp4_template
= Template(udp_template
).substitute(family
=socket
.AF_INET
,
1544 tcp6_template
= Template(tcp_template
).substitute(family
=socket
.AF_INET6
,
1546 udp6_template
= Template(udp_template
).substitute(family
=socket
.AF_INET6
,
1549 # launch various subprocess instantiating a socket of various
1550 # families and types to enrich psutil results
1551 tcp4_proc
= get_test_subprocess([PYTHON
, "-c", tcp4_template
])
1552 udp4_proc
= get_test_subprocess([PYTHON
, "-c", udp4_template
])
1554 tcp6_proc
= get_test_subprocess([PYTHON
, "-c", tcp6_template
])
1555 udp6_proc
= get_test_subprocess([PYTHON
, "-c", udp6_template
])
1560 # check matches against subprocesses just created
1561 all_kinds
= ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
1562 "udp", "udp4", "udp6")
1563 for p
in psutil
.Process(os
.getpid()).get_children():
1564 for conn
in p
.get_connections():
1566 if p
.pid
== tcp4_proc
.pid
:
1567 self
.assertEqual(conn
.family
, socket
.AF_INET
)
1568 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1569 self
.assertEqual(conn
.laddr
[0], "127.0.0.1")
1570 self
.assertEqual(conn
.raddr
, ())
1571 self
.assertEqual(conn
.status
, psutil
.CONN_LISTEN
,
1573 for kind
in all_kinds
:
1574 cons
= p
.get_connections(kind
=kind
)
1575 if kind
in ("all", "inet", "inet4", "tcp", "tcp4"):
1576 assert cons
!= [], cons
1578 self
.assertEqual(cons
, [], cons
)
1580 elif p
.pid
== udp4_proc
.pid
:
1581 self
.assertEqual(conn
.family
, socket
.AF_INET
)
1582 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1583 self
.assertEqual(conn
.laddr
[0], "127.0.0.1")
1584 self
.assertEqual(conn
.raddr
, ())
1585 self
.assertEqual(conn
.status
, psutil
.CONN_NONE
,
1587 for kind
in all_kinds
:
1588 cons
= p
.get_connections(kind
=kind
)
1589 if kind
in ("all", "inet", "inet4", "udp", "udp4"):
1590 assert cons
!= [], cons
1592 self
.assertEqual(cons
, [], cons
)
1594 elif p
.pid
== getattr(tcp6_proc
, "pid", None):
1595 self
.assertEqual(conn
.family
, socket
.AF_INET6
)
1596 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1597 self
.assertIn(conn
.laddr
[0], ("::", "::1"))
1598 self
.assertEqual(conn
.raddr
, ())
1599 self
.assertEqual(conn
.status
, psutil
.CONN_LISTEN
,
1601 for kind
in all_kinds
:
1602 cons
= p
.get_connections(kind
=kind
)
1603 if kind
in ("all", "inet", "inet6", "tcp", "tcp6"):
1604 assert cons
!= [], cons
1606 self
.assertEqual(cons
, [], cons
)
1608 elif p
.pid
== getattr(udp6_proc
, "pid", None):
1609 self
.assertEqual(conn
.family
, socket
.AF_INET6
)
1610 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1611 self
.assertIn(conn
.laddr
[0], ("::", "::1"))
1612 self
.assertEqual(conn
.raddr
, ())
1613 self
.assertEqual(conn
.status
, psutil
.CONN_NONE
,
1615 for kind
in all_kinds
:
1616 cons
= p
.get_connections(kind
=kind
)
1617 if kind
in ("all", "inet", "inet6", "udp", "udp6"):
1618 assert cons
!= [], cons
1620 self
.assertEqual(cons
, [], cons
)
1622 @unittest.skipUnless(POSIX
, 'posix only')
1623 def test_get_num_fds(self
):
1624 p
= psutil
.Process(os
.getpid())
1625 start
= p
.get_num_fds()
1626 file = open(TESTFN
, 'w')
1627 self
.assertEqual(p
.get_num_fds(), start
+ 1)
1628 sock
= socket
.socket()
1629 self
.assertEqual(p
.get_num_fds(), start
+ 2)
1632 self
.assertEqual(p
.get_num_fds(), start
)
1634 @skip_on_not_implemented(only_if
=LINUX
)
1635 def test_get_num_ctx_switches(self
):
1636 p
= psutil
.Process(os
.getpid())
1637 before
= sum(p
.get_num_ctx_switches())
1638 for x
in range(500000):
1639 after
= sum(p
.get_num_ctx_switches())
1642 self
.fail("num ctx switches still the same after 50.000 iterations")
1644 def test_parent_ppid(self
):
1645 this_parent
= os
.getpid()
1646 sproc
= get_test_subprocess()
1647 p
= psutil
.Process(sproc
.pid
)
1648 self
.assertEqual(p
.ppid
, this_parent
)
1649 self
.assertEqual(p
.parent
.pid
, this_parent
)
1650 # no other process is supposed to have us as parent
1651 for p
in psutil
.process_iter():
1652 if p
.pid
== sproc
.pid
:
1654 self
.assertTrue(p
.ppid
!= this_parent
)
1656 def test_get_children(self
):
1657 p
= psutil
.Process(os
.getpid())
1658 self
.assertEqual(p
.get_children(), [])
1659 self
.assertEqual(p
.get_children(recursive
=True), [])
1660 sproc
= get_test_subprocess()
1661 children1
= p
.get_children()
1662 children2
= p
.get_children(recursive
=True)
1663 for children
in (children1
, children2
):
1664 self
.assertEqual(len(children
), 1)
1665 self
.assertEqual(children
[0].pid
, sproc
.pid
)
1666 self
.assertEqual(children
[0].ppid
, os
.getpid())
1668 def test_get_children_recursive(self
):
1669 # here we create a subprocess which creates another one as in:
1670 # A (parent) -> B (child) -> C (grandchild)
1671 s
= "import subprocess, os, sys, time;"
1672 s
+= "PYTHON = os.path.realpath(sys.executable);"
1673 s
+= "cmd = [PYTHON, '-c', 'import time; time.sleep(2);'];"
1674 s
+= "subprocess.Popen(cmd);"
1675 s
+= "time.sleep(2);"
1676 get_test_subprocess(cmd
=[PYTHON
, "-c", s
])
1677 p
= psutil
.Process(os
.getpid())
1678 self
.assertEqual(len(p
.get_children(recursive
=False)), 1)
1679 # give the grandchild some time to start
1680 stop_at
= time
.time() + 1.5
1681 while time
.time() < stop_at
:
1682 children
= p
.get_children(recursive
=True)
1683 if len(children
) > 1:
1685 self
.assertEqual(len(children
), 2)
1686 self
.assertEqual(children
[0].ppid
, os
.getpid())
1687 self
.assertEqual(children
[1].ppid
, children
[0].pid
)
1689 def test_get_children_duplicates(self
):
1690 # find the process which has the highest number of children
1691 from psutil
._compat
import defaultdict
1692 table
= defaultdict(int)
1693 for p
in psutil
.process_iter():
1696 except psutil
.Error
:
1698 # this is the one, now let's make sure there are no duplicates
1699 pid
= sorted(table
.items(), key
=lambda x
: x
[1])[-1][0]
1700 p
= psutil
.Process(pid
)
1702 c
= p
.get_children(recursive
=True)
1703 except psutil
.AccessDenied
: # windows
1706 self
.assertEqual(len(c
), len(set(c
)))
1708 def test_suspend_resume(self
):
1709 sproc
= get_test_subprocess(wait
=True)
1710 p
= psutil
.Process(sproc
.pid
)
1712 for x
in range(100):
1713 if p
.status
== psutil
.STATUS_STOPPED
:
1716 self
.assertEqual(str(p
.status
), "stopped")
1718 assert p
.status
!= psutil
.STATUS_STOPPED
, p
.status
1720 def test_invalid_pid(self
):
1721 self
.assertRaises(TypeError, psutil
.Process
, "1")
1722 self
.assertRaises(TypeError, psutil
.Process
, None)
1723 self
.assertRaises(ValueError, psutil
.Process
, -1)
1725 def test_as_dict(self
):
1726 sproc
= get_test_subprocess()
1727 p
= psutil
.Process(sproc
.pid
)
1734 # dict is supposed to be hashable
1737 d
= p
.as_dict(attrs
=['exe', 'name'])
1738 self
.assertEqual(sorted(d
.keys()), ['exe', 'name'])
1740 p
= psutil
.Process(min(psutil
.get_pid_list()))
1741 d
= p
.as_dict(attrs
=['get_connections'], ad_value
='foo')
1742 if not isinstance(d
['connections'], list):
1743 self
.assertEqual(d
['connections'], 'foo')
1745 def test_zombie_process(self
):
1746 # Test that NoSuchProcess exception gets raised in case the
1747 # process dies after we create the Process object.
1749 # >>> proc = Process(1234)
1750 # >>> time.sleep(2) # time-consuming task, process dies in meantime
1752 # Refers to Issue #15
1753 sproc
= get_test_subprocess()
1754 p
= psutil
.Process(sproc
.pid
)
1759 if name
.startswith('_')\
1760 or name
in ('pid', 'send_signal', 'is_running', 'set_ionice',
1761 'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
1765 meth
= getattr(p
, name
)
1768 except psutil
.NoSuchProcess
:
1770 except NotImplementedError:
1773 self
.fail("NoSuchProcess exception not raised for %r" % name
)
1777 if os
.name
== 'posix':
1780 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1781 except psutil
.NoSuchProcess
:
1784 self
.fail("exception not raised")
1785 if hasattr(p
, 'set_ionice'):
1786 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_ionice
, 2)
1787 self
.assertRaises(psutil
.NoSuchProcess
, p
.send_signal
, signal
.SIGTERM
)
1788 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_nice
, 0)
1789 self
.assertFalse(p
.is_running())
1790 if hasattr(p
, "set_cpu_affinity"):
1791 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_cpu_affinity
, [0])
1793 def test__str__(self
):
1794 sproc
= get_test_subprocess()
1795 p
= psutil
.Process(sproc
.pid
)
1796 self
.assertIn(str(sproc
.pid
), str(p
))
1797 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1799 self
.assertIn(os
.path
.basename(PYTHON
), str(p
))
1800 sproc
= get_test_subprocess()
1801 p
= psutil
.Process(sproc
.pid
)
1804 self
.assertIn(str(sproc
.pid
), str(p
))
1805 self
.assertIn("terminated", str(p
))
1807 @unittest.skipIf(LINUX
, 'PID 0 not available on Linux')
1808 def test_pid_0(self
):
1809 # Process(0) is supposed to work on all platforms except Linux
1810 p
= psutil
.Process(0)
1811 self
.assertTrue(p
.name
)
1813 if os
.name
== 'posix':
1815 self
.assertEqual(p
.uids
.real
, 0)
1816 self
.assertEqual(p
.gids
.real
, 0)
1817 except psutil
.AccessDenied
:
1820 self
.assertIn(p
.ppid
, (0, 1))
1821 #self.assertEqual(p.exe, "")
1825 except psutil
.AccessDenied
:
1830 except psutil
.AccessDenied
:
1836 self
.assertEqual(p
.username
, 'root')
1838 self
.assertEqual(p
.username
, 'NT AUTHORITY\\SYSTEM')
1841 except psutil
.AccessDenied
:
1844 self
.assertIn(0, psutil
.get_pid_list())
1845 self
.assertTrue(psutil
.pid_exists(0))
1847 def test__all__(self
):
1848 for name
in dir(psutil
):
1849 if name
in ('callable', 'defaultdict', 'error', 'namedtuple',
1852 if not name
.startswith('_'):
1856 if name
not in psutil
.__all
__:
1857 fun
= getattr(psutil
, name
)
1860 if 'deprecated' not in fun
.__doc
__.lower():
1861 self
.fail('%r not in psutil.__all__' % name
)
1863 def test_Popen(self
):
1865 # XXX this test causes a ResourceWarning on Python 3 because
1866 # psutil.__subproc instance doesn't get propertly freed.
1867 # Not sure what to do though.
1868 cmd
= [PYTHON
, "-c", "import time; time.sleep(2);"]
1869 proc
= psutil
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
1873 self
.assertTrue(hasattr(proc
, 'name'))
1874 self
.assertTrue(hasattr(proc
, 'stdin'))
1875 self
.assertRaises(AttributeError, getattr, proc
, 'foo')
1881 # ===================================================================
1882 # --- Featch all processes test
1883 # ===================================================================
1885 class TestFetchAllProcesses(unittest
.TestCase
):
1886 # Iterates over all running processes and performs some sanity
1887 # checks against Process API's returned values.
1892 pall
= pwd
.getpwall()
1893 self
._uids
= set([x
.pw_uid
for x
in pall
])
1894 self
._usernames
= set([x
.pw_name
for x
in pall
])
1896 def test_fetch_all(self
):
1898 excluded_names
= ['send_signal', 'suspend', 'resume', 'terminate',
1899 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
1900 'parent', 'get_children', 'pid']
1902 for name
in dir(psutil
.Process
):
1903 if name
.startswith("_"):
1905 if name
.startswith("set_"):
1907 if name
in excluded_names
:
1914 for p
in psutil
.process_iter():
1918 attr
= getattr(p
, name
, None)
1919 if attr
is not None and callable(attr
):
1924 except NotImplementedError:
1925 register_warning("%r was skipped because not "
1926 "implemented" % (self
.__class
__.__name
__ + \
1928 except (psutil
.NoSuchProcess
, psutil
.AccessDenied
):
1929 err
= sys
.exc_info()[1]
1930 if isinstance(err
, psutil
.NoSuchProcess
):
1931 if psutil
.pid_exists(p
.pid
):
1932 # XXX race condition; we probably need
1933 # to try figuring out the process
1934 # identity before failing
1935 self
.fail("PID still exists but fun raised " \
1937 self
.assertEqual(err
.pid
, p
.pid
)
1939 # make sure exception's name attr is set
1940 # with the actual process name
1941 self
.assertEqual(err
.name
, p
.name
)
1942 self
.assertTrue(str(err
))
1943 self
.assertTrue(err
.msg
)
1945 if ret
not in (0, 0.0, [], None, ''):
1947 meth
= getattr(self
, name
)
1950 err
= sys
.exc_info()[1]
1951 s
= '\n' + '=' * 70 + '\n'
1952 s
+= "FAIL: test_%s (proc=%s" % (name
, p
)
1954 s
+= ", ret=%s)" % repr(ret
)
1957 s
+= "\n%s" % traceback
.format_exc()
1958 s
= "\n".join((" " * 4) + i
for i
in s
.splitlines())
1963 self
.fail(''.join(failures
))
1965 # we should always have a non-empty list, not including PID 0 etc.
1967 self
.assertTrue(valid_procs
> 0)
1969 def cmdline(self
, ret
):
1974 self
.assertEqual(ret
, '')
1976 assert os
.path
.isabs(ret
), ret
1977 # Note: os.stat() may return False even if the file is there
1978 # hence we skip the test, see:
1979 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
1981 assert os
.path
.isfile(ret
), ret
1982 if hasattr(os
, 'access') and hasattr(os
, "X_OK"):
1983 # XXX may fail on OSX
1984 self
.assertTrue(os
.access(ret
, os
.X_OK
))
1986 def ppid(self
, ret
):
1987 self
.assertTrue(ret
>= 0)
1989 def name(self
, ret
):
1990 self
.assertTrue(isinstance(ret
, str))
1991 self
.assertTrue(ret
)
1993 def create_time(self
, ret
):
1994 self
.assertTrue(ret
> 0)
1995 # this can't be taken for granted on all platforms
1996 #self.assertGreaterEqual(ret, psutil.BOOT_TIME)
1997 # make sure returned value can be pretty printed
1999 time
.strftime("%Y %m %d %H:%M:%S", time
.localtime(ret
))
2001 def uids(self
, ret
):
2003 self
.assertTrue(uid
>= 0)
2004 self
.assertIn(uid
, self
._uids
)
2006 def gids(self
, ret
):
2007 # note: testing all gids as above seems not to be reliable for
2008 # gid == 30 (nodoby); not sure why.
2010 self
.assertTrue(gid
>= 0)
2011 #self.assertIn(uid, self.gids)
2013 def username(self
, ret
):
2014 self
.assertTrue(ret
)
2015 if os
.name
== 'posix':
2016 self
.assertIn(ret
, self
._usernames
)
2018 def status(self
, ret
):
2019 self
.assertTrue(ret
>= 0)
2020 self
.assertTrue(str(ret
) != '?')
2022 def get_io_counters(self
, ret
):
2025 self
.assertTrue(field
>= 0)
2027 def get_ionice(self
, ret
):
2029 self
.assertTrue(ret
.ioclass
>= 0)
2030 self
.assertTrue(ret
.value
>= 0)
2032 self
.assertTrue(ret
>= 0)
2033 self
.assertIn(ret
, (0, 1, 2))
2035 def get_num_threads(self
, ret
):
2036 self
.assertTrue(ret
>= 1)
2038 def get_threads(self
, ret
):
2040 self
.assertTrue(t
.id >= 0)
2041 self
.assertTrue(t
.user_time
>= 0)
2042 self
.assertTrue(t
.system_time
>= 0)
2044 def get_cpu_times(self
, ret
):
2045 self
.assertTrue(ret
.user
>= 0)
2046 self
.assertTrue(ret
.system
>= 0)
2048 def get_memory_info(self
, ret
):
2049 self
.assertTrue(ret
.rss
>= 0)
2050 self
.assertTrue(ret
.vms
>= 0)
2052 def get_ext_memory_info(self
, ret
):
2053 for name
in ret
._fields
:
2054 self
.assertTrue(getattr(ret
, name
) >= 0)
2055 if POSIX
and ret
.vms
!= 0:
2056 # VMS is always supposed to be the highest
2057 for name
in ret
._fields
:
2059 value
= getattr(ret
, name
)
2060 assert ret
.vms
> value
, ret
2062 assert ret
.peak_wset
>= ret
.wset
, ret
2063 assert ret
.peak_paged_pool
>= ret
.paged_pool
, ret
2064 assert ret
.peak_nonpaged_pool
>= ret
.nonpaged_pool
, ret
2065 assert ret
.peak_pagefile
>= ret
.pagefile
, ret
2067 def get_open_files(self
, ret
):
2070 assert f
.fd
== -1, f
2072 self
.assertIsInstance(f
.fd
, int)
2073 assert os
.path
.isabs(f
.path
), f
2074 assert os
.path
.isfile(f
.path
), f
2076 def get_num_fds(self
, ret
):
2077 self
.assertTrue(ret
>= 0)
2079 def get_connections(self
, ret
):
2080 # all values are supposed to match Linux's tcp_states.h states
2081 # table across all platforms.
2082 valid_conn_states
= ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
2083 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
2084 "LAST_ACK", "LISTEN", "CLOSING", "NONE"]
2086 valid_conn_states
+= ["IDLE", "BOUND"]
2088 valid_conn_states
+= ["DELETE_TCB"]
2090 self
.assertIn(conn
.type, (socket
.SOCK_STREAM
, socket
.SOCK_DGRAM
))
2091 self
.assertIn(conn
.family
, (socket
.AF_INET
, socket
.AF_INET6
))
2092 check_ip_address(conn
.laddr
, conn
.family
)
2093 check_ip_address(conn
.raddr
, conn
.family
)
2094 if conn
.status
not in valid_conn_states
:
2095 self
.fail("%s is not a valid status" % conn
.status
)
2096 # actually try to bind the local socket; ignore IPv6
2097 # sockets as their address might be represented as
2098 # an IPv4-mapped-address (e.g. "::127.0.0.1")
2099 # and that's rejected by bind()
2100 if conn
.family
== socket
.AF_INET
:
2101 s
= socket
.socket(conn
.family
, conn
.type)
2102 s
.bind((conn
.laddr
[0], 0))
2105 if not WINDOWS
and hasattr(socket
, 'fromfd'):
2109 dupsock
= socket
.fromfd(conn
.fd
, conn
.family
, conn
.type)
2110 except (socket
.error
, OSError):
2111 err
= sys
.exc_info()[1]
2112 if err
.args
[0] == errno
.EBADF
:
2116 if hasattr(dupsock
, "family"):
2117 self
.assertEqual(dupsock
.family
, conn
.family
)
2118 self
.assertEqual(dupsock
.type, conn
.type)
2120 if dupsock
is not None:
2123 def getcwd(self
, ret
):
2124 if ret
is not None: # BSD may return None
2125 assert os
.path
.isabs(ret
), ret
2129 err
= sys
.exc_info()[1]
2130 # directory has been removed in mean time
2131 if err
.errno
!= errno
.ENOENT
:
2134 self
.assertTrue(stat
.S_ISDIR(st
.st_mode
))
2136 def get_memory_percent(self
, ret
):
2137 assert 0 <= ret
<= 100, ret
2139 def is_running(self
, ret
):
2140 self
.assertTrue(ret
)
2142 def get_cpu_affinity(self
, ret
):
2143 assert ret
!= [], ret
2145 def terminal(self
, ret
):
2147 assert os
.path
.isabs(ret
), ret
2148 assert os
.path
.exists(ret
), ret
2150 def get_memory_maps(self
, ret
):
2152 for fname
in nt
._fields
:
2153 value
= getattr(nt
, fname
)
2155 if not value
.startswith('['):
2156 assert os
.path
.isabs(nt
.path
), nt
.path
2157 # commented as on Linux we might get '/foo/bar (deleted)'
2158 #assert os.path.exists(nt.path), nt.path
2159 elif fname
in ('addr', 'perms'):
2160 self
.assertTrue(value
)
2162 self
.assertIsInstance(value
, (int, long))
2163 assert value
>= 0, value
2165 def get_num_handles(self
, ret
):
2167 self
.assertGreaterEqual(ret
, 0)
2169 self
.assertGreaterEqual(ret
, 0)
2171 def get_nice(self
, ret
):
2173 assert -20 <= ret
<= 20, ret
2175 priorities
= [getattr(psutil
, x
) for x
in dir(psutil
)
2176 if x
.endswith('_PRIORITY_CLASS')]
2177 self
.assertIn(ret
, priorities
)
2179 def get_num_ctx_switches(self
, ret
):
2180 self
.assertTrue(ret
.voluntary
>= 0)
2181 self
.assertTrue(ret
.involuntary
>= 0)
2184 # ===================================================================
2185 # --- Limited user tests
2186 # ===================================================================
2188 if hasattr(os
, 'getuid') and os
.getuid() == 0:
2189 class LimitedUserTestCase(TestProcess
):
2190 """Repeat the previous tests by using a limited user.
2191 Executed only on UNIX and only if the user who run the test script
2194 # the uid/gid the test suite runs under
2195 PROCESS_UID
= os
.getuid()
2196 PROCESS_GID
= os
.getgid()
2198 def __init__(self
, *args
, **kwargs
):
2199 TestProcess
.__init
__(self
, *args
, **kwargs
)
2200 # re-define all existent test methods in order to
2201 # ignore AccessDenied exceptions
2202 for attr
in [x
for x
in dir(self
) if x
.startswith('test')]:
2203 meth
= getattr(self
, attr
)
2207 except psutil
.AccessDenied
:
2209 setattr(self
, attr
, types
.MethodType(test_
, self
))
2214 TestProcess
.setUp(self
)
2217 os
.setegid(self
.PROCESS_UID
)
2218 os
.seteuid(self
.PROCESS_GID
)
2219 TestProcess
.tearDown(self
)
2221 def test_nice(self
):
2223 psutil
.Process(os
.getpid()).set_nice(-1)
2224 except psutil
.AccessDenied
:
2227 self
.fail("exception not raised")
2230 # ===================================================================
2231 # --- Example script tests
2232 # ===================================================================
2234 class TestExampleScripts(unittest
.TestCase
):
2235 """Tests for scripts in the examples directory."""
2237 def assert_stdout(self
, exe
, args
=None):
2238 exe
= os
.path
.join(EXAMPLES_DIR
, exe
)
2240 exe
= exe
+ ' ' + args
2242 out
= sh(sys
.executable
+ ' ' + exe
).strip()
2243 except RuntimeError:
2244 err
= sys
.exc_info()[1]
2245 if 'AccessDenied' in str(err
):
2252 def assert_syntax(self
, exe
, args
=None):
2253 exe
= os
.path
.join(EXAMPLES_DIR
, exe
)
2261 def test_check_presence(self
):
2262 # make sure all example scripts have a test method defined
2264 for name
in os
.listdir(EXAMPLES_DIR
):
2265 if name
.endswith('.py'):
2266 if 'test_' + os
.path
.splitext(name
)[0] not in meths
:
2267 #self.assert_stdout(name)
2268 self
.fail('no test defined for %r script' \
2269 % os
.path
.join(EXAMPLES_DIR
, name
))
2271 def test_disk_usage(self
):
2272 self
.assert_stdout('disk_usage.py')
2274 def test_free(self
):
2275 self
.assert_stdout('free.py')
2277 def test_meminfo(self
):
2278 self
.assert_stdout('meminfo.py')
2280 def test_process_detail(self
):
2281 self
.assert_stdout('process_detail.py')
2284 self
.assert_stdout('who.py')
2286 def test_netstat(self
):
2287 self
.assert_stdout('netstat.py')
2289 def test_pmap(self
):
2290 self
.assert_stdout('pmap.py', args
=str(os
.getpid()))
2292 @unittest.skipIf(ast
is None,
2293 'ast module not available on this python version')
2294 def test_killall(self
):
2295 self
.assert_syntax('killall.py')
2297 @unittest.skipIf(ast
is None,
2298 'ast module not available on this python version')
2299 def test_nettop(self
):
2300 self
.assert_syntax('nettop.py')
2302 @unittest.skipIf(ast
is None,
2303 'ast module not available on this python version')
2305 self
.assert_syntax('top.py')
2307 @unittest.skipIf(ast
is None,
2308 'ast module not available on this python version')
2309 def test_iotop(self
):
2310 self
.assert_syntax('iotop.py')
2314 reap_children(search_all
=True)
2318 atexit
.register(cleanup
)
2323 test_suite
= unittest
.TestSuite()
2324 tests
.append(TestSystemAPIs
)
2325 tests
.append(TestProcess
)
2326 tests
.append(TestFetchAllProcesses
)
2329 from _posix
import PosixSpecificTestCase
2330 tests
.append(PosixSpecificTestCase
)
2332 # import the specific platform test suite
2334 from _linux
import LinuxSpecificTestCase
as stc
2336 from _windows
import WindowsSpecificTestCase
as stc
2337 from _windows
import TestDualProcessImplementation
2338 tests
.append(TestDualProcessImplementation
)
2340 from _osx
import OSXSpecificTestCase
as stc
2342 from _bsd
import BSDSpecificTestCase
as stc
2344 from _sunos
import SunOSSpecificTestCase
as stc
2347 if hasattr(os
, 'getuid'):
2348 if 'LimitedUserTestCase' in globals():
2349 tests
.append(LimitedUserTestCase
)
2351 register_warning("LimitedUserTestCase was skipped (super-user "
2352 "privileges are required)")
2354 tests
.append(TestExampleScripts
)
2356 for test_class
in tests
:
2357 test_suite
.addTest(unittest
.makeSuite(test_class
))
2358 result
= unittest
.TextTestRunner(verbosity
=2).run(test_suite
)
2359 return result
.wasSuccessful()
2361 if __name__
== '__main__':