3 # $Id: test_psutil.py 1525 2012-08-16 16:32:03Z g.rodola $
5 # Copyright (c) 2009, Jay Loden, Giampaolo Rodola'. All rights reserved.
6 # Use of this source code is governed by a BSD-style license that can be
7 # found in the LICENSE file.
12 Note: this is targeted for both python 2.x and 3.x so there's no need
13 to use 2to3 tool first.
16 from __future__
import division
36 from psutil
._compat
import PY3
, callable, long
39 PYTHON
= os
.path
.realpath(sys
.executable
)
40 DEVNULL
= open(os
.devnull
, 'r+')
41 TESTFN
= os
.path
.join(os
.getcwd(), "$testfile")
42 POSIX
= os
.name
== 'posix'
43 LINUX
= sys
.platform
.startswith("linux")
44 WINDOWS
= sys
.platform
.startswith("win32")
45 OSX
= sys
.platform
.startswith("darwin")
46 BSD
= sys
.platform
.startswith("freebsd")
49 _subprocesses_started
= set()
51 def get_test_subprocess(cmd
=None, stdout
=DEVNULL
, stderr
=DEVNULL
, stdin
=None,
53 """Return a subprocess.Popen object to use in tests.
54 By default stdout and stderr are redirected to /dev/null and the
55 python interpreter is used as test process.
56 If 'wait' is True attemps to make sure the process is in a
57 reasonably initialized state.
62 pyline
+= "open('%s', 'w'); " % TESTFN
63 pyline
+= "import time; time.sleep(3600);"
64 cmd_
= [PYTHON
, "-c", pyline
]
67 sproc
= subprocess
.Popen(cmd_
, stdout
=stdout
, stderr
=stderr
, stdin
=stdin
)
70 stop_at
= time
.time() + 3
71 while time
.time() > stop_at
:
72 if os
.path
.exists(TESTFN
):
76 warn("couldn't make sure test file was actually created")
78 wait_for_pid(sproc
.pid
)
79 _subprocesses_started
.add(sproc
.pid
)
82 def warn(msg
, warntype
=RuntimeWarning):
83 """Raise a warning msg."""
84 warnings
.warn(msg
, warntype
)
86 def sh(cmdline
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
):
87 """run cmd in a subprocess and return its output.
88 raises RuntimeError on error.
90 p
= subprocess
.Popen(cmdline
, shell
=True, stdout
=stdout
, stderr
=stderr
)
91 stdout
, stderr
= p
.communicate()
93 raise RuntimeError(stderr
)
97 stdout
= str(stdout
, sys
.stdout
.encoding
)
101 """Same as UNIX which command. Return None on command not found."""
103 return os
.path
.isfile(fpath
) and os
.access(fpath
, os
.X_OK
)
105 fpath
, fname
= os
.path
.split(program
)
110 for path
in os
.environ
["PATH"].split(os
.pathsep
):
111 exe_file
= os
.path
.join(path
, program
)
116 def wait_for_pid(pid
, timeout
=1):
117 """Wait for pid to show up in the process list then return.
118 Used in the test suite to give time the sub process to initialize.
120 raise_at
= time
.time() + timeout
122 if pid
in psutil
.get_pid_list():
123 # give it one more iteration to allow full initialization
127 if time
.time() >= raise_at
:
128 raise RuntimeError("Timed out")
130 def reap_children(search_all
=False):
131 """Kill any subprocess started by this test suite and ensure that
132 no zombies stick around to hog resources and create problems when
133 looking for refleaks.
135 pids
= _subprocesses_started
137 this_process
= psutil
.Process(os
.getpid())
138 for p
in this_process
.get_children(recursive
=True):
143 child
= psutil
.Process(pid
)
145 except psutil
.NoSuchProcess
:
147 except psutil
.AccessDenied
:
148 warn("couldn't kill child process with pid %s" % pid
)
150 child
.wait(timeout
=3)
152 def check_ip_address(addr
, family
):
153 """Attempts to check IP address's validity."""
157 assert isinstance(port
, int), port
158 if family
== socket
.AF_INET
:
159 ip
= list(map(int, ip
.split('.')))
160 assert len(ip
) == 4, ip
162 assert 0 <= num
<= 255, ip
163 assert 0 <= port
<= 65535, port
165 def safe_remove(fname
):
171 def call_until(fun
, expr
, timeout
=1):
172 """Keep calling function for timeout secs and exit if eval()
175 stop_at
= time
.time() + timeout
176 while time
.time() < stop_at
:
181 raise RuntimeError('timed out (ret=%r)' % ret
)
184 def skipIf(condition
, reason
="", warn
=False):
185 """Decorator which skip a test under if condition is satisfied.
186 This is a substitute of unittest.skipIf which is available
187 only in python 2.7 and 3.2.
188 If 'reason' argument is provided this will be printed during
190 If 'warn' is provided a RuntimeWarning will be shown when all
193 def outer(fun
, *args
, **kwargs
):
196 sys
.stdout
.write("skipped-")
199 objname
= "%s.%s" % (self
.__class
__.__name
__, fun
.__name
__)
200 msg
= "%s was skipped" % objname
202 msg
+= "; reason: " + repr(reason
)
203 atexit
.register(warn
, msg
)
206 return fun(self
, *args
, **kwargs
)
210 def skipUnless(condition
, reason
="", warn
=False):
211 """Contrary of skipIf."""
213 return skipIf(True, reason
, warn
)
216 def ignore_access_denied(fun
):
217 """Decorator to Ignore AccessDenied exceptions."""
218 def outer(fun
, *args
, **kwargs
):
221 return fun(self
, *args
, **kwargs
)
222 except psutil
.AccessDenied
:
228 """Return True if IPv6 is supported on this platform."""
229 if not socket
.has_ipv6
or not hasattr(socket
, "AF_INET6"):
234 sock
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
235 sock
.bind(("::1", 0))
236 except (socket
.error
, socket
.gaierror
):
245 class ThreadTask(threading
.Thread
):
246 """A thread object used for running process thread tests."""
249 threading
.Thread
.__init
__(self
)
250 self
._running
= False
251 self
._interval
= None
252 self
._flag
= threading
.Event()
255 name
= self
.__class
__.__name
__
256 return '<%s running=%s at %#x>' % (name
, self
._running
, id(self
))
258 def start(self
, interval
=0.001):
259 """Start thread and keep it running until an explicit
260 stop() request. Polls for shutdown every 'timeout' seconds.
263 raise ValueError("already started")
264 self
._interval
= interval
265 threading
.Thread
.start(self
)
272 time
.sleep(self
._interval
)
275 """Stop thread execution and and waits until it is stopped."""
276 if not self
._running
:
277 raise ValueError("already stopped")
278 self
._running
= False
282 class TestCase(unittest
.TestCase
):
290 if not hasattr(unittest
.TestCase
, "assertIn"):
291 def assertIn(self
, member
, container
, msg
=None):
292 if member
not in container
:
294 '%s not found in %s' % (repr(member
), repr(container
)))
296 def assert_eq_w_tol(self
, first
, second
, tolerance
):
297 difference
= abs(first
- second
)
298 if difference
<= tolerance
:
300 msg
= '%r != %r within %r delta (%r difference)' \
301 % (first
, second
, tolerance
, difference
)
302 raise AssertionError(msg
)
304 # ============================
305 # tests for system-related API
306 # ============================
308 def test_process_iter(self
):
309 assert os
.getpid() in [x
.pid
for x
in psutil
.process_iter()]
310 sproc
= get_test_subprocess()
311 assert sproc
.pid
in [x
.pid
for x
in psutil
.process_iter()]
312 p
= psutil
.Process(sproc
.pid
)
315 assert sproc
.pid
not in [x
.pid
for x
in psutil
.process_iter()]
317 def test_TOTAL_PHYMEM(self
):
318 x
= psutil
.TOTAL_PHYMEM
319 assert isinstance(x
, (int, long))
321 self
.assertEqual(x
, psutil
.virtual_memory().total
)
323 def test_BOOT_TIME(self
):
325 assert isinstance(x
, float)
327 assert x
< time
.time(), x
329 def test_NUM_CPUS(self
):
330 self
.assertEqual(psutil
.NUM_CPUS
, len(psutil
.cpu_times(percpu
=True)))
331 assert psutil
.NUM_CPUS
>= 1, psutil
.NUM_CPUS
334 def test_PAGESIZE(self
):
335 # pagesize is used internally to perform different calculations
336 # and it's determined by using SC_PAGE_SIZE; make sure
337 # getpagesize() returns the same value.
339 self
.assertEqual(os
.sysconf("SC_PAGE_SIZE"), resource
.getpagesize())
341 def test_deprecated_apis(self
):
342 warnings
.filterwarnings("error")
343 p
= psutil
.Process(os
.getpid())
345 self
.assertRaises(DeprecationWarning, psutil
.virtmem_usage
)
346 self
.assertRaises(DeprecationWarning, psutil
.used_phymem
)
347 self
.assertRaises(DeprecationWarning, psutil
.avail_phymem
)
348 self
.assertRaises(DeprecationWarning, psutil
.total_virtmem
)
349 self
.assertRaises(DeprecationWarning, psutil
.used_virtmem
)
350 self
.assertRaises(DeprecationWarning, psutil
.avail_virtmem
)
351 self
.assertRaises(DeprecationWarning, psutil
.phymem_usage
)
352 self
.assertRaises(DeprecationWarning, psutil
.get_process_list
)
353 self
.assertRaises(DeprecationWarning, psutil
.get_process_list
)
355 self
.assertRaises(DeprecationWarning, psutil
.phymem_buffers
)
356 self
.assertRaises(DeprecationWarning, psutil
.cached_phymem
)
359 except DeprecationWarning:
362 self
.fail("p.nice didn't raise DeprecationWarning")
364 warnings
.resetwarnings()
366 def test_deprecated_apis_retval(self
):
367 warnings
.filterwarnings("ignore")
368 p
= psutil
.Process(os
.getpid())
370 self
.assertEqual(psutil
.total_virtmem(), psutil
.swap_memory().total
)
371 self
.assertEqual(psutil
.get_process_list(), list(psutil
.process_iter()))
372 self
.assertEqual(p
.nice
, p
.get_nice())
374 warnings
.resetwarnings()
376 def test_virtual_memory(self
):
377 mem
= psutil
.virtual_memory()
378 assert mem
.total
> 0, mem
379 assert mem
.available
> 0, mem
380 assert 0 <= mem
.percent
<= 100, mem
381 assert mem
.used
> 0, mem
382 assert mem
.free
>= 0, mem
383 for name
in mem
._fields
:
385 value
= getattr(mem
, name
)
387 self
.fail("%r < 0 (%s)" % (name
, value
))
388 if value
> mem
.total
:
389 self
.fail("%r > total (total=%s, %s=%s)" \
390 % (name
, mem
.total
, name
, value
))
392 def test_swap_memory(self
):
393 mem
= psutil
.swap_memory()
394 assert mem
.total
> 0, mem
395 assert mem
.used
>= 0, mem
396 assert mem
.free
> 0, mem
397 assert 0 <= mem
.percent
<= 100, mem
398 assert mem
.sin
>= 0, mem
399 assert mem
.sout
>= 0, mem
401 def test_pid_exists(self
):
402 sproc
= get_test_subprocess(wait
=True)
403 assert psutil
.pid_exists(sproc
.pid
)
404 p
= psutil
.Process(sproc
.pid
)
407 self
.assertFalse(psutil
.pid_exists(sproc
.pid
))
408 self
.assertFalse(psutil
.pid_exists(-1))
410 def test_pid_exists_2(self
):
412 pids
= psutil
.get_pid_list()
415 assert psutil
.pid_exists(pid
)
416 except AssertionError:
417 # in case the process disappeared in meantime fail only
418 # if it is no longer in get_pid_list()
420 if pid
in psutil
.get_pid_list():
422 pids
= range(max(pids
) + 5000, max(pids
) + 6000)
424 self
.assertFalse(psutil
.pid_exists(pid
))
426 def test_get_pid_list(self
):
427 plist
= [x
.pid
for x
in psutil
.process_iter()]
428 pidlist
= psutil
.get_pid_list()
429 self
.assertEqual(plist
.sort(), pidlist
.sort())
430 # make sure every pid is unique
431 self
.assertEqual(len(pidlist
), len(set(pidlist
)))
434 # test for psutil.test() function
442 def test_sys_cpu_times(self
):
444 times
= psutil
.cpu_times()
446 for cp_time
in times
:
447 assert isinstance(cp_time
, float)
448 assert cp_time
>= 0.0, cp_time
450 self
.assertEqual(total
, sum(times
))
453 def test_sys_cpu_times2(self
):
454 t1
= sum(psutil
.cpu_times())
456 t2
= sum(psutil
.cpu_times())
458 if not difference
>= 0.05:
459 self
.fail("difference %s" % difference
)
461 def test_sys_per_cpu_times(self
):
462 for times
in psutil
.cpu_times(percpu
=True):
465 for cp_time
in times
:
466 assert isinstance(cp_time
, float)
467 assert cp_time
>= 0.0, cp_time
469 self
.assertEqual(total
, sum(times
))
472 def test_sys_per_cpu_times2(self
):
473 tot1
= psutil
.cpu_times(percpu
=True)
474 stop_at
= time
.time() + 0.1
476 if time
.time() >= stop_at
:
478 tot2
= psutil
.cpu_times(percpu
=True)
479 for t1
, t2
in zip(tot1
, tot2
):
480 t1
, t2
= sum(t1
), sum(t2
)
482 if difference
>= 0.05:
486 def test_sys_cpu_percent(self
):
487 psutil
.cpu_percent(interval
=0.001)
488 psutil
.cpu_percent(interval
=0.001)
489 for x
in range(1000):
490 percent
= psutil
.cpu_percent(interval
=None)
491 assert isinstance(percent
, float)
492 assert percent
>= 0.0, percent
493 assert percent
<= 100.0, percent
495 def test_sys_per_cpu_percent(self
):
496 psutil
.cpu_percent(interval
=0.001, percpu
=True)
497 psutil
.cpu_percent(interval
=0.001, percpu
=True)
498 for x
in range(1000):
499 percents
= psutil
.cpu_percent(interval
=None, percpu
=True)
500 for percent
in percents
:
501 assert isinstance(percent
, float)
502 assert percent
>= 0.0, percent
503 assert percent
<= 100.0, percent
505 def test_disk_usage(self
):
506 usage
= psutil
.disk_usage(os
.getcwd())
507 assert usage
.total
> 0, usage
508 assert usage
.used
> 0, usage
509 assert usage
.free
> 0, usage
510 assert usage
.total
> usage
.used
, usage
511 assert usage
.total
> usage
.free
, usage
512 assert 0 <= usage
.percent
<= 100, usage
.percent
514 # if path does not exist OSError ENOENT is expected across
516 fname
= tempfile
.mktemp()
518 psutil
.disk_usage(fname
)
520 err
= sys
.exc_info()[1]
521 if err
.args
[0] != errno
.ENOENT
:
524 self
.fail("OSError not raised")
526 def test_disk_partitions(self
):
527 for disk
in psutil
.disk_partitions(all
=False):
528 assert os
.path
.exists(disk
.device
), disk
529 assert os
.path
.isdir(disk
.mountpoint
), disk
530 assert disk
.fstype
, disk
531 assert isinstance(disk
.opts
, str)
532 for disk
in psutil
.disk_partitions(all
=True):
535 os
.stat(disk
.mountpoint
)
537 # http://mail.python.org/pipermail/python-dev/2012-June/120787.html
538 err
= sys
.exc_info()[1]
539 if err
.errno
not in (errno
.EPERM
, errno
.EACCES
):
542 assert os
.path
.isdir(disk
.mountpoint
), disk
.mountpoint
543 assert isinstance(disk
.fstype
, str)
544 assert isinstance(disk
.opts
, str)
546 def find_mount_point(path
):
547 path
= os
.path
.abspath(path
)
548 while not os
.path
.ismount(path
):
549 path
= os
.path
.dirname(path
)
552 mount
= find_mount_point(__file__
)
553 mounts
= [x
.mountpoint
for x
in psutil
.disk_partitions(all
=True)]
554 self
.assertIn(mount
, mounts
)
555 psutil
.disk_usage(mount
)
557 def test_network_io_counters(self
):
558 def check_ntuple(nt
):
559 self
.assertEqual(nt
[0], nt
.bytes_sent
)
560 self
.assertEqual(nt
[1], nt
.bytes_recv
)
561 self
.assertEqual(nt
[2], nt
.packets_sent
)
562 self
.assertEqual(nt
[3], nt
.packets_recv
)
563 self
.assertEqual(nt
[4], nt
.errin
)
564 self
.assertEqual(nt
[5], nt
.errout
)
565 self
.assertEqual(nt
[6], nt
.dropin
)
566 self
.assertEqual(nt
[7], nt
.dropout
)
567 assert nt
.bytes_sent
>= 0, nt
568 assert nt
.bytes_recv
>= 0, nt
569 assert nt
.packets_sent
>= 0, nt
570 assert nt
.packets_recv
>= 0, nt
571 assert nt
.errin
>= 0, nt
572 assert nt
.errout
>= 0, nt
573 assert nt
.dropin
>= 0, nt
574 assert nt
.dropout
>= 0, nt
576 ret
= psutil
.network_io_counters(pernic
=False)
578 ret
= psutil
.network_io_counters(pernic
=True)
582 check_ntuple(ret
[key
])
584 def test_disk_io_counters(self
):
585 def check_ntuple(nt
):
586 self
.assertEqual(nt
[0], nt
.read_count
)
587 self
.assertEqual(nt
[1], nt
.write_count
)
588 self
.assertEqual(nt
[2], nt
.read_bytes
)
589 self
.assertEqual(nt
[3], nt
.write_bytes
)
590 self
.assertEqual(nt
[4], nt
.read_time
)
591 self
.assertEqual(nt
[5], nt
.write_time
)
592 assert nt
.read_count
>= 0, nt
593 assert nt
.write_count
>= 0, nt
594 assert nt
.read_bytes
>= 0, nt
595 assert nt
.write_bytes
>= 0, nt
596 assert nt
.read_time
>= 0, nt
597 assert nt
.write_time
>= 0, nt
599 ret
= psutil
.disk_io_counters(perdisk
=False)
601 ret
= psutil
.disk_io_counters(perdisk
=True)
604 check_ntuple(ret
[key
])
606 def test_get_users(self
):
607 users
= psutil
.get_users()
610 assert user
.name
, user
613 assert user
.started
> 0.0, user
614 datetime
.datetime
.fromtimestamp(user
.started
)
616 # ====================
617 # Process object tests
618 # ====================
621 sproc
= get_test_subprocess(wait
=True)
623 p
= psutil
.Process(test_pid
)
627 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
629 def test_terminate(self
):
630 sproc
= get_test_subprocess(wait
=True)
632 p
= psutil
.Process(test_pid
)
636 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
638 def test_send_signal(self
):
643 sproc
= get_test_subprocess()
645 p
= psutil
.Process(test_pid
)
649 self
.assertFalse(psutil
.pid_exists(test_pid
) and name
== PYTHON
)
652 # check exit code signal
653 sproc
= get_test_subprocess()
654 p
= psutil
.Process(sproc
.pid
)
657 if os
.name
== 'posix':
658 self
.assertEqual(code
, signal
.SIGKILL
)
660 self
.assertEqual(code
, 0)
661 self
.assertFalse(p
.is_running())
663 sproc
= get_test_subprocess()
664 p
= psutil
.Process(sproc
.pid
)
667 if os
.name
== 'posix':
668 self
.assertEqual(code
, signal
.SIGTERM
)
670 self
.assertEqual(code
, 0)
671 self
.assertFalse(p
.is_running())
673 # check sys.exit() code
674 code
= "import time, sys; time.sleep(0.01); sys.exit(5);"
675 sproc
= get_test_subprocess([PYTHON
, "-c", code
])
676 p
= psutil
.Process(sproc
.pid
)
677 self
.assertEqual(p
.wait(), 5)
678 self
.assertFalse(p
.is_running())
680 # Test wait() issued twice.
681 # It is not supposed to raise NSP when the process is gone.
682 # On UNIX this should return None, on Windows it should keep
683 # returning the exit code.
684 sproc
= get_test_subprocess([PYTHON
, "-c", code
])
685 p
= psutil
.Process(sproc
.pid
)
686 self
.assertEqual(p
.wait(), 5)
687 self
.assertIn(p
.wait(), (5, None))
690 sproc
= get_test_subprocess()
691 p
= psutil
.Process(sproc
.pid
)
693 self
.assertRaises(psutil
.TimeoutExpired
, p
.wait
, 0.01)
695 # timeout < 0 not allowed
696 self
.assertRaises(ValueError, p
.wait
, -1)
699 def test_wait_non_children(self
):
700 # test wait() against processes which are not our children
702 code
+= "from subprocess import Popen, PIPE;"
703 code
+= "cmd = ['%s', '-c', 'import time; time.sleep(10)'];" %PYTHON
704 code
+= "sp = Popen(cmd, stdout=PIPE);"
705 code
+= "sys.stdout.write(str(sp.pid));"
706 sproc
= get_test_subprocess([PYTHON
, "-c", code
], stdout
=subprocess
.PIPE
)
708 grandson_pid
= int(sproc
.stdout
.read())
709 grandson_proc
= psutil
.Process(grandson_pid
)
711 self
.assertRaises(psutil
.TimeoutExpired
, grandson_proc
.wait
, 0.01)
713 ret
= grandson_proc
.wait()
714 self
.assertEqual(ret
, None)
716 if grandson_proc
.is_running():
720 def test_wait_timeout_0(self
):
721 sproc
= get_test_subprocess()
722 p
= psutil
.Process(sproc
.pid
)
723 self
.assertRaises(psutil
.TimeoutExpired
, p
.wait
, 0)
725 stop_at
= time
.time() + 2
729 except psutil
.TimeoutExpired
:
730 if time
.time() >= stop_at
:
734 if os
.name
== 'posix':
735 self
.assertEqual(code
, signal
.SIGKILL
)
737 self
.assertEqual(code
, 0)
738 self
.assertFalse(p
.is_running())
740 def test_cpu_percent(self
):
741 p
= psutil
.Process(os
.getpid())
742 p
.get_cpu_percent(interval
=0.001)
743 p
.get_cpu_percent(interval
=0.001)
745 percent
= p
.get_cpu_percent(interval
=None)
746 assert isinstance(percent
, float)
747 assert percent
>= 0.0, percent
748 if os
.name
!= 'posix':
749 assert percent
<= 100.0, percent
751 assert percent
>= 0.0, percent
753 def test_cpu_times(self
):
754 times
= psutil
.Process(os
.getpid()).get_cpu_times()
755 assert (times
.user
> 0.0) or (times
.system
> 0.0), times
756 # make sure returned values can be pretty printed with strftime
757 time
.strftime("%H:%M:%S", time
.localtime(times
.user
))
758 time
.strftime("%H:%M:%S", time
.localtime(times
.system
))
760 # Test Process.cpu_times() against os.times()
761 # os.times() is broken on Python 2.6
762 # http://bugs.python.org/issue1040026
763 # XXX fails on OSX: not sure if it's for os.times(). We should
764 # try this with Python 2.7 and re-enable the test.
766 @skipUnless(sys
.version_info
> (2, 6, 1) and not OSX
)
767 def test_cpu_times2(self
):
768 user_time
, kernel_time
= psutil
.Process(os
.getpid()).get_cpu_times()
769 utime
, ktime
= os
.times()[:2]
771 # Use os.times()[:2] as base values to compare our results
772 # using a tolerance of +/- 0.1 seconds.
773 # It will fail if the difference between the values is > 0.1s.
774 if (max([user_time
, utime
]) - min([user_time
, utime
])) > 0.1:
775 self
.fail("expected: %s, found: %s" %(utime
, user_time
))
777 if (max([kernel_time
, ktime
]) - min([kernel_time
, ktime
])) > 0.1:
778 self
.fail("expected: %s, found: %s" %(ktime
, kernel_time
))
780 def test_create_time(self
):
781 sproc
= get_test_subprocess(wait
=True)
783 p
= psutil
.Process(sproc
.pid
)
784 create_time
= p
.create_time
786 # Use time.time() as base value to compare our result using a
787 # tolerance of +/- 1 second.
788 # It will fail if the difference between the values is > 2s.
789 difference
= abs(create_time
- now
)
791 self
.fail("expected: %s, found: %s, difference: %s"
792 % (now
, create_time
, difference
))
794 # make sure returned value can be pretty printed with strftime
795 time
.strftime("%Y %m %d %H:%M:%S", time
.localtime(p
.create_time
))
798 def test_terminal(self
):
800 p
= psutil
.Process(os
.getpid())
801 self
.assertEqual(p
.terminal
, tty
)
803 @skipIf(OSX
, warn
=False)
804 def test_get_io_counters(self
):
805 p
= psutil
.Process(os
.getpid())
807 io1
= p
.get_io_counters()
808 f
= open(PYTHON
, 'rb')
811 io2
= p
.get_io_counters()
813 assert io2
.read_count
> io1
.read_count
, (io1
, io2
)
814 self
.assertEqual(io2
.write_count
, io1
.write_count
)
815 assert io2
.read_bytes
>= io1
.read_bytes
, (io1
, io2
)
816 assert io2
.write_bytes
>= io1
.write_bytes
, (io1
, io2
)
818 io1
= p
.get_io_counters()
819 f
= tempfile
.TemporaryFile()
821 f
.write(bytes("x" * 1000000, 'ascii'))
823 f
.write("x" * 1000000)
825 io2
= p
.get_io_counters()
826 assert io2
.write_count
>= io1
.write_count
, (io1
, io2
)
827 assert io2
.write_bytes
>= io1
.write_bytes
, (io1
, io2
)
828 assert io2
.read_count
>= io1
.read_count
, (io1
, io2
)
829 assert io2
.read_bytes
>= io1
.read_bytes
, (io1
, io2
)
832 def test_get_set_ionice(self
):
833 from psutil
import (IOPRIO_CLASS_NONE
, IOPRIO_CLASS_RT
, IOPRIO_CLASS_BE
,
835 self
.assertEqual(IOPRIO_CLASS_NONE
, 0)
836 self
.assertEqual(IOPRIO_CLASS_RT
, 1)
837 self
.assertEqual(IOPRIO_CLASS_BE
, 2)
838 self
.assertEqual(IOPRIO_CLASS_IDLE
, 3)
839 p
= psutil
.Process(os
.getpid())
842 ioclass
, value
= p
.get_ionice()
843 self
.assertEqual(ioclass
, 2)
844 self
.assertEqual(value
, 4)
847 ioclass
, value
= p
.get_ionice()
848 self
.assertEqual(ioclass
, 3)
849 self
.assertEqual(value
, 0)
852 ioclass
, value
= p
.get_ionice()
853 self
.assertEqual(ioclass
, 2)
854 self
.assertEqual(value
, 0)
856 ioclass
, value
= p
.get_ionice()
857 self
.assertEqual(ioclass
, 2)
858 self
.assertEqual(value
, 7)
859 self
.assertRaises(ValueError, p
.set_ionice
, 2, 10)
861 p
.set_ionice(IOPRIO_CLASS_NONE
)
863 def test_get_num_threads(self
):
864 # on certain platforms such as Linux we might test for exact
865 # thread number, since we always have with 1 thread per process,
866 # but this does not apply across all platforms (OSX, Windows)
867 p
= psutil
.Process(os
.getpid())
868 step1
= p
.get_num_threads()
870 thread
= ThreadTask()
873 step2
= p
.get_num_threads()
874 self
.assertEqual(step2
, step1
+ 1)
881 def test_get_num_handles(self
):
882 # a better test is done later into test/_windows.py
883 p
= psutil
.Process(os
.getpid())
884 assert p
.get_num_handles() > 0
886 def test_get_threads(self
):
887 p
= psutil
.Process(os
.getpid())
888 step1
= p
.get_threads()
890 thread
= ThreadTask()
894 step2
= p
.get_threads()
895 self
.assertEqual(len(step2
), len(step1
) + 1)
896 # on Linux, first thread id is supposed to be this process
898 self
.assertEqual(step2
[0].id, os
.getpid())
901 self
.assertEqual(athread
.id, athread
[0])
902 self
.assertEqual(athread
.user_time
, athread
[1])
903 self
.assertEqual(athread
.system_time
, athread
[2])
910 def test_get_memory_info(self
):
911 p
= psutil
.Process(os
.getpid())
913 # step 1 - get a base value to compare our results
914 rss1
, vms1
= p
.get_memory_info()
915 percent1
= p
.get_memory_percent()
919 # step 2 - allocate some memory
920 memarr
= [None] * 1500000
922 rss2
, vms2
= p
.get_memory_info()
923 percent2
= p
.get_memory_percent()
924 # make sure that the memory usage bumped up
926 assert vms2
>= vms1
# vms might be equal
927 assert percent2
> percent1
930 # def test_get_ext_memory_info(self):
931 # # tested later in fetch all test suite
933 def test_get_memory_maps(self
):
934 p
= psutil
.Process(os
.getpid())
935 maps
= p
.get_memory_maps()
936 paths
= [x
for x
in maps
]
937 self
.assertEqual(len(paths
), len(set(paths
)))
938 ext_maps
= p
.get_memory_maps(grouped
=False)
941 if not nt
.path
.startswith('['):
942 assert os
.path
.isabs(nt
.path
), nt
.path
944 assert os
.path
.exists(nt
.path
), nt
.path
946 # XXX - On Windows we have this strange behavior with
947 # 64 bit dlls: they are visible via explorer but cannot
948 # be accessed via os.stat() (wtf?).
949 if '64' not in os
.path
.basename(nt
.path
):
950 assert os
.path
.exists(nt
.path
), nt
.path
952 for fname
in nt
._fields
:
953 value
= getattr(nt
, fname
)
956 elif fname
in ('addr', 'perms'):
959 assert isinstance(value
, (int, long))
960 assert value
>= 0, value
962 def test_get_memory_percent(self
):
963 p
= psutil
.Process(os
.getpid())
964 assert p
.get_memory_percent() > 0.0
967 sproc
= get_test_subprocess()
968 self
.assertEqual(psutil
.Process(sproc
.pid
).pid
, sproc
.pid
)
970 def test_is_running(self
):
971 sproc
= get_test_subprocess(wait
=True)
972 p
= psutil
.Process(sproc
.pid
)
973 assert p
.is_running()
974 assert p
.is_running()
977 assert not p
.is_running()
978 assert not p
.is_running()
981 sproc
= get_test_subprocess(wait
=True)
982 exe
= psutil
.Process(sproc
.pid
).exe
984 self
.assertEqual(exe
, PYTHON
)
985 except AssertionError:
986 # certain platforms such as BSD are more accurate returning:
987 # "/usr/local/bin/python2.7"
989 # "/usr/local/bin/python"
990 # We do not want to consider this difference in accuracy
992 ver
= "%s.%s" % (sys
.version_info
[0], sys
.version_info
[1])
993 self
.assertEqual(exe
.replace(ver
, ''), PYTHON
.replace(ver
, ''))
995 def test_cmdline(self
):
996 sproc
= get_test_subprocess([PYTHON
, "-E"], wait
=True)
997 self
.assertEqual(psutil
.Process(sproc
.pid
).cmdline
, [PYTHON
, "-E"])
1000 sproc
= get_test_subprocess(PYTHON
, wait
=True)
1001 pyexe
= os
.path
.basename(os
.path
.realpath(sys
.executable
)).lower()
1002 self
.assertEqual(psutil
.Process(sproc
.pid
).name
.lower(), pyexe
)
1004 if os
.name
== 'posix':
1006 def test_uids(self
):
1007 p
= psutil
.Process(os
.getpid())
1008 real
, effective
, saved
= p
.uids
1009 # os.getuid() refers to "real" uid
1010 self
.assertEqual(real
, os
.getuid())
1011 # os.geteuid() refers to "effective" uid
1012 self
.assertEqual(effective
, os
.geteuid())
1013 # no such thing as os.getsuid() ("saved" uid), but starting
1014 # from python 2.7 we have os.getresuid()[2]
1015 if hasattr(os
, "getresuid"):
1016 self
.assertEqual(saved
, os
.getresuid()[2])
1018 def test_gids(self
):
1019 p
= psutil
.Process(os
.getpid())
1020 real
, effective
, saved
= p
.gids
1021 # os.getuid() refers to "real" uid
1022 self
.assertEqual(real
, os
.getgid())
1023 # os.geteuid() refers to "effective" uid
1024 self
.assertEqual(effective
, os
.getegid())
1025 # no such thing as os.getsuid() ("saved" uid), but starting
1026 # from python 2.7 we have os.getresgid()[2]
1027 if hasattr(os
, "getresuid"):
1028 self
.assertEqual(saved
, os
.getresgid()[2])
1030 def test_nice(self
):
1031 p
= psutil
.Process(os
.getpid())
1032 self
.assertRaises(TypeError, p
.set_nice
, "str")
1035 first_nice
= p
.get_nice()
1037 self
.assertEqual(p
.get_nice(), 1)
1038 # going back to previous nice value raises AccessDenied on OSX
1041 self
.assertEqual(p
.get_nice(), 0)
1042 except psutil
.AccessDenied
:
1046 p
.set_nice(first_nice
)
1047 except psutil
.AccessDenied
:
1052 def test_nice(self
):
1053 p
= psutil
.Process(os
.getpid())
1054 self
.assertRaises(TypeError, p
.set_nice
, "str")
1056 self
.assertEqual(p
.get_nice(), psutil
.NORMAL_PRIORITY_CLASS
)
1057 p
.set_nice(psutil
.HIGH_PRIORITY_CLASS
)
1058 self
.assertEqual(p
.get_nice(), psutil
.HIGH_PRIORITY_CLASS
)
1059 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1060 self
.assertEqual(p
.get_nice(), psutil
.NORMAL_PRIORITY_CLASS
)
1062 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1064 def test_status(self
):
1065 p
= psutil
.Process(os
.getpid())
1066 self
.assertEqual(p
.status
, psutil
.STATUS_RUNNING
)
1067 self
.assertEqual(str(p
.status
), "running")
1069 def test_status_constants(self
):
1070 # STATUS_* constants are supposed to be comparable also by
1071 # using their str representation
1072 self
.assertTrue(psutil
.STATUS_RUNNING
== 0)
1073 self
.assertTrue(psutil
.STATUS_RUNNING
== long(0))
1074 self
.assertTrue(psutil
.STATUS_RUNNING
== 'running')
1075 self
.assertFalse(psutil
.STATUS_RUNNING
== 1)
1076 self
.assertFalse(psutil
.STATUS_RUNNING
== 'sleeping')
1077 self
.assertFalse(psutil
.STATUS_RUNNING
!= 0)
1078 self
.assertFalse(psutil
.STATUS_RUNNING
!= 'running')
1079 self
.assertTrue(psutil
.STATUS_RUNNING
!= 1)
1080 self
.assertTrue(psutil
.STATUS_RUNNING
!= 'sleeping')
1082 def test_username(self
):
1083 sproc
= get_test_subprocess()
1084 p
= psutil
.Process(sproc
.pid
)
1087 self
.assertEqual(p
.username
, pwd
.getpwuid(os
.getuid()).pw_name
)
1089 expected_username
= os
.environ
['USERNAME']
1090 expected_domain
= os
.environ
['USERDOMAIN']
1091 domain
, username
= p
.username
.split('\\')
1092 self
.assertEqual(domain
, expected_domain
)
1093 self
.assertEqual(username
, expected_username
)
1097 @skipIf(not hasattr(psutil
.Process
, "getcwd"))
1098 def test_getcwd(self
):
1099 sproc
= get_test_subprocess(wait
=True)
1100 p
= psutil
.Process(sproc
.pid
)
1101 self
.assertEqual(p
.getcwd(), os
.getcwd())
1103 @skipIf(not hasattr(psutil
.Process
, "getcwd"))
1104 def test_getcwd_2(self
):
1105 cmd
= [PYTHON
, "-c", "import os, time; os.chdir('..'); time.sleep(10)"]
1106 sproc
= get_test_subprocess(cmd
, wait
=True)
1107 p
= psutil
.Process(sproc
.pid
)
1108 call_until(p
.getcwd
, "ret == os.path.dirname(os.getcwd())", timeout
=1)
1110 @skipIf(not hasattr(psutil
.Process
, "get_cpu_affinity"))
1111 def test_cpu_affinity(self
):
1112 p
= psutil
.Process(os
.getpid())
1113 initial
= p
.get_cpu_affinity()
1114 all_cpus
= list(range(len(psutil
.cpu_percent(percpu
=True))))
1117 p
.set_cpu_affinity([n
])
1118 self
.assertEqual(p
.get_cpu_affinity(), [n
])
1120 p
.set_cpu_affinity(all_cpus
)
1121 self
.assertEqual(p
.get_cpu_affinity(), all_cpus
)
1123 p
.set_cpu_affinity(initial
)
1124 invalid_cpu
= [len(psutil
.cpu_times(percpu
=True)) + 10]
1125 self
.assertRaises(ValueError, p
.set_cpu_affinity
, invalid_cpu
)
1127 def test_get_open_files(self
):
1129 p
= psutil
.Process(os
.getpid())
1130 files
= p
.get_open_files()
1131 self
.assertFalse(TESTFN
in files
)
1132 f
= open(TESTFN
, 'w')
1133 call_until(p
.get_open_files
, "len(ret) != %i" % len(files
))
1134 filenames
= [x
.path
for x
in p
.get_open_files()]
1135 self
.assertIn(TESTFN
, filenames
)
1137 for file in filenames
:
1138 assert os
.path
.isfile(file), file
1141 cmdline
= "import time; f = open(r'%s', 'r'); time.sleep(100);" % TESTFN
1142 sproc
= get_test_subprocess([PYTHON
, "-c", cmdline
], wait
=True)
1143 p
= psutil
.Process(sproc
.pid
)
1144 for x
in range(100):
1145 filenames
= [x
.path
for x
in p
.get_open_files()]
1146 if TESTFN
in filenames
:
1150 self
.assertIn(TESTFN
, filenames
)
1151 for file in filenames
:
1152 assert os
.path
.isfile(file), file
1154 def test_get_open_files2(self
):
1155 # test fd and path fields
1156 fileobj
= open(TESTFN
, 'w')
1157 p
= psutil
.Process(os
.getpid())
1158 for path
, fd
in p
.get_open_files():
1159 if path
== fileobj
.name
or fd
== fileobj
.fileno():
1162 self
.fail("no file found; files=%s" % repr(p
.get_open_files()))
1163 self
.assertEqual(path
, fileobj
.name
)
1165 self
.assertEqual(fd
, -1)
1167 self
.assertEqual(fd
, fileobj
.fileno())
1169 ntuple
= p
.get_open_files()[0]
1170 self
.assertEqual(ntuple
[0], ntuple
.path
)
1171 self
.assertEqual(ntuple
[1], ntuple
.fd
)
1174 self
.assertTrue(fileobj
.name
not in p
.get_open_files())
1176 def test_get_connections(self
):
1177 arg
= "import socket, time;" \
1178 "s = socket.socket();" \
1179 "s.bind(('127.0.0.1', 0));" \
1181 "conn, addr = s.accept();" \
1183 sproc
= get_test_subprocess([PYTHON
, "-c", arg
])
1184 p
= psutil
.Process(sproc
.pid
)
1185 for x
in range(100):
1186 cons
= p
.get_connections()
1190 self
.assertEqual(len(cons
), 1)
1192 self
.assertEqual(con
.family
, socket
.AF_INET
)
1193 self
.assertEqual(con
.type, socket
.SOCK_STREAM
)
1194 self
.assertEqual(con
.status
, "LISTEN")
1195 ip
, port
= con
.local_address
1196 self
.assertEqual(ip
, '127.0.0.1')
1197 self
.assertEqual(con
.remote_address
, ())
1199 self
.assertEqual(con
.fd
, -1)
1201 assert con
.fd
> 0, con
1203 self
.assertEqual(con
[0], con
.fd
)
1204 self
.assertEqual(con
[1], con
.family
)
1205 self
.assertEqual(con
[2], con
.type)
1206 self
.assertEqual(con
[3], con
.local_address
)
1207 self
.assertEqual(con
[4], con
.remote_address
)
1208 self
.assertEqual(con
[5], con
.status
)
1210 self
.assertRaises(ValueError, p
.get_connections
, 'foo')
1212 @skipUnless(supports_ipv6())
1213 def test_get_connections_ipv6(self
):
1214 s
= socket
.socket(socket
.AF_INET6
, socket
.SOCK_STREAM
)
1217 cons
= psutil
.Process(os
.getpid()).get_connections()
1219 self
.assertEqual(len(cons
), 1)
1220 self
.assertEqual(cons
[0].local_address
[0], '::1')
1222 @skipUnless(hasattr(socket
, 'AF_UNIX'))
1223 def test_get_connections_unix(self
):
1226 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1228 conn
= psutil
.Process(os
.getpid()).get_connections(kind
='unix')[0]
1229 self
.assertEqual(conn
.fd
, sock
.fileno())
1230 self
.assertEqual(conn
.family
, socket
.AF_UNIX
)
1231 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1232 self
.assertEqual(conn
.local_address
, TESTFN
)
1233 self
.assertTrue(not conn
.remote_address
)
1234 self
.assertTrue(not conn
.status
)
1238 sock
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_DGRAM
)
1240 conn
= psutil
.Process(os
.getpid()).get_connections(kind
='unix')[0]
1241 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1244 @skipUnless(hasattr(socket
, "fromfd") and not WINDOWS
)
1245 def test_connection_fromfd(self
):
1246 sock
= socket
.socket()
1247 sock
.bind(('localhost', 0))
1249 p
= psutil
.Process(os
.getpid())
1250 for conn
in p
.get_connections():
1251 if conn
.fd
== sock
.fileno():
1255 self
.fail("couldn't find socket fd")
1256 dupsock
= socket
.fromfd(conn
.fd
, conn
.family
, conn
.type)
1258 self
.assertEqual(dupsock
.getsockname(), conn
.local_address
)
1259 self
.assertNotEqual(sock
.fileno(), dupsock
.fileno())
1264 def test_get_connections_all(self
):
1265 tcp_template
= "import socket;" \
1266 "s = socket.socket($family, socket.SOCK_STREAM);" \
1267 "s.bind(('$addr', 0));" \
1269 "conn, addr = s.accept();"
1271 udp_template
= "import socket, time;" \
1272 "s = socket.socket($family, socket.SOCK_DGRAM);" \
1273 "s.bind(('$addr', 0));" \
1276 from string
import Template
1277 tcp4_template
= Template(tcp_template
).substitute(family
=socket
.AF_INET
,
1279 udp4_template
= Template(udp_template
).substitute(family
=socket
.AF_INET
,
1281 tcp6_template
= Template(tcp_template
).substitute(family
=socket
.AF_INET6
,
1283 udp6_template
= Template(udp_template
).substitute(family
=socket
.AF_INET6
,
1286 # launch various subprocess instantiating a socket of various
1287 # families and types to enrich psutil results
1288 tcp4_proc
= get_test_subprocess([PYTHON
, "-c", tcp4_template
])
1289 udp4_proc
= get_test_subprocess([PYTHON
, "-c", udp4_template
])
1291 tcp6_proc
= get_test_subprocess([PYTHON
, "-c", tcp6_template
])
1292 udp6_proc
= get_test_subprocess([PYTHON
, "-c", udp6_template
])
1297 # check matches against subprocesses just created
1298 all_kinds
= ("all", "inet", "inet4", "inet6", "tcp", "tcp4", "tcp6",
1299 "udp", "udp4", "udp6")
1300 for p
in psutil
.Process(os
.getpid()).get_children():
1301 for conn
in p
.get_connections():
1303 if p
.pid
== tcp4_proc
.pid
:
1304 self
.assertEqual(conn
.family
, socket
.AF_INET
)
1305 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1306 self
.assertEqual(conn
.local_address
[0], "127.0.0.1")
1307 self
.assertEqual(conn
.remote_address
, ())
1308 self
.assertEqual(conn
.status
, "LISTEN")
1309 for kind
in all_kinds
:
1310 cons
= p
.get_connections(kind
=kind
)
1311 if kind
in ("all", "inet", "inet4", "tcp", "tcp4"):
1312 assert cons
!= [], cons
1314 self
.assertEqual(cons
, [], cons
)
1316 elif p
.pid
== udp4_proc
.pid
:
1317 self
.assertEqual(conn
.family
, socket
.AF_INET
)
1318 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1319 self
.assertEqual(conn
.local_address
[0], "127.0.0.1")
1320 self
.assertEqual(conn
.remote_address
, ())
1321 self
.assertEqual(conn
.status
, "")
1322 for kind
in all_kinds
:
1323 cons
= p
.get_connections(kind
=kind
)
1324 if kind
in ("all", "inet", "inet4", "udp", "udp4"):
1325 assert cons
!= [], cons
1327 self
.assertEqual(cons
, [], cons
)
1329 elif p
.pid
== getattr(tcp6_proc
, "pid", None):
1330 self
.assertEqual(conn
.family
, socket
.AF_INET6
)
1331 self
.assertEqual(conn
.type, socket
.SOCK_STREAM
)
1332 self
.assertIn(conn
.local_address
[0], ("::", "::1"))
1333 self
.assertEqual(conn
.remote_address
, ())
1334 self
.assertEqual(conn
.status
, "LISTEN")
1335 for kind
in all_kinds
:
1336 cons
= p
.get_connections(kind
=kind
)
1337 if kind
in ("all", "inet", "inet6", "tcp", "tcp6"):
1338 assert cons
!= [], cons
1340 self
.assertEqual(cons
, [], cons
)
1342 elif p
.pid
== getattr(udp6_proc
, "pid", None):
1343 self
.assertEqual(conn
.family
, socket
.AF_INET6
)
1344 self
.assertEqual(conn
.type, socket
.SOCK_DGRAM
)
1345 self
.assertIn(conn
.local_address
[0], ("::", "::1"))
1346 self
.assertEqual(conn
.remote_address
, ())
1347 self
.assertEqual(conn
.status
, "")
1348 for kind
in all_kinds
:
1349 cons
= p
.get_connections(kind
=kind
)
1350 if kind
in ("all", "inet", "inet6", "udp", "udp6"):
1351 assert cons
!= [], cons
1353 self
.assertEqual(cons
, [], cons
)
1356 def test_get_num_fds(self
):
1357 p
= psutil
.Process(os
.getpid())
1358 start
= p
.get_num_fds()
1359 file = open(TESTFN
, 'w')
1360 self
.assertEqual(p
.get_num_fds(), start
+ 1)
1361 sock
= socket
.socket()
1362 self
.assertEqual(p
.get_num_fds(), start
+ 2)
1365 self
.assertEqual(p
.get_num_fds(), start
)
1367 def test_get_num_ctx_switches(self
):
1368 p
= psutil
.Process(os
.getpid())
1369 before
= sum(p
.get_num_ctx_switches())
1370 for x
in range(50000):
1371 after
= sum(p
.get_num_ctx_switches())
1374 self
.fail("num ctx switches still the same after 50.000 iterations")
1376 def test_parent_ppid(self
):
1377 this_parent
= os
.getpid()
1378 sproc
= get_test_subprocess()
1379 p
= psutil
.Process(sproc
.pid
)
1380 self
.assertEqual(p
.ppid
, this_parent
)
1381 self
.assertEqual(p
.parent
.pid
, this_parent
)
1382 # no other process is supposed to have us as parent
1383 for p
in psutil
.process_iter():
1384 if p
.pid
== sproc
.pid
:
1386 self
.assertTrue(p
.ppid
!= this_parent
)
1388 def test_get_children(self
):
1389 p
= psutil
.Process(os
.getpid())
1390 self
.assertEqual(p
.get_children(), [])
1391 self
.assertEqual(p
.get_children(recursive
=True), [])
1392 sproc
= get_test_subprocess()
1393 children1
= p
.get_children()
1394 children2
= p
.get_children(recursive
=True)
1395 for children
in (children1
, children2
):
1396 self
.assertEqual(len(children
), 1)
1397 self
.assertEqual(children
[0].pid
, sproc
.pid
)
1398 self
.assertEqual(children
[0].ppid
, os
.getpid())
1400 def test_get_children_recursive(self
):
1401 # here we create a subprocess which creates another one as in:
1402 # A (parent) -> B (child) -> C (grandchild)
1403 s
= "import subprocess, os, sys, time;"
1404 s
+= "PYTHON = os.path.realpath(sys.executable);"
1405 s
+= "cmd = [PYTHON, '-c', 'import time; time.sleep(3600);'];"
1406 s
+= "subprocess.Popen(cmd);"
1407 s
+= "time.sleep(3600);"
1408 get_test_subprocess(cmd
=[PYTHON
, "-c", s
])
1409 p
= psutil
.Process(os
.getpid())
1410 self
.assertEqual(len(p
.get_children(recursive
=False)), 1)
1411 # give the grandchild some time to start
1412 stop_at
= time
.time() + 1.5
1413 while time
.time() < stop_at
:
1414 children
= p
.get_children(recursive
=True)
1415 if len(children
) > 1:
1417 self
.assertEqual(len(children
), 2)
1418 self
.assertEqual(children
[0].ppid
, os
.getpid())
1419 self
.assertEqual(children
[1].ppid
, children
[0].pid
)
1421 def test_get_children_duplicates(self
):
1422 # find the process which has the highest number of children
1423 from psutil
._compat
import defaultdict
1424 table
= defaultdict(int)
1425 for p
in psutil
.process_iter():
1428 except psutil
.Error
:
1430 # this is the one, now let's make sure there are no duplicates
1431 pid
= max(sorted(table
, key
=lambda x
: table
[x
]))
1432 p
= psutil
.Process(pid
)
1434 c
= p
.get_children(recursive
=True)
1435 except psutil
.AccessDenied
: # windows
1438 self
.assertEqual(len(c
), len(set(c
)))
1440 def test_suspend_resume(self
):
1441 sproc
= get_test_subprocess(wait
=True)
1442 p
= psutil
.Process(sproc
.pid
)
1444 for x
in range(100):
1445 if p
.status
== psutil
.STATUS_STOPPED
:
1448 self
.assertEqual(str(p
.status
), "stopped")
1450 assert p
.status
!= psutil
.STATUS_STOPPED
, p
.status
1452 def test_invalid_pid(self
):
1453 self
.assertRaises(TypeError, psutil
.Process
, "1")
1454 self
.assertRaises(TypeError, psutil
.Process
, None)
1455 # Refers to Issue #12
1456 self
.assertRaises(psutil
.NoSuchProcess
, psutil
.Process
, -1)
1458 def test_as_dict(self
):
1459 sproc
= get_test_subprocess()
1460 p
= psutil
.Process(sproc
.pid
)
1467 # dict is supposed to be hashable
1470 d
= p
.as_dict(attrs
=['exe', 'name'])
1471 self
.assertEqual(sorted(d
.keys()), ['exe', 'name'])
1473 p
= psutil
.Process(min(psutil
.get_pid_list()))
1474 d
= p
.as_dict(attrs
=['get_connections'], ad_value
='foo')
1475 if not isinstance(d
['connections'], list):
1476 self
.assertEqual(d
['connections'], 'foo')
1478 def test_zombie_process(self
):
1479 # Test that NoSuchProcess exception gets raised in case the
1480 # process dies after we create the Process object.
1482 # >>> proc = Process(1234)
1483 # >>> time.sleep(5) # time-consuming task, process dies in meantime
1485 # Refers to Issue #15
1486 sproc
= get_test_subprocess()
1487 p
= psutil
.Process(sproc
.pid
)
1492 if name
.startswith('_')\
1493 or name
in ('pid', 'send_signal', 'is_running', 'set_ionice',
1494 'wait', 'set_cpu_affinity', 'create_time', 'set_nice',
1498 meth
= getattr(p
, name
)
1501 except psutil
.NoSuchProcess
:
1504 self
.fail("NoSuchProcess exception not raised for %r" % name
)
1508 if os
.name
== 'posix':
1511 p
.set_nice(psutil
.NORMAL_PRIORITY_CLASS
)
1512 except psutil
.NoSuchProcess
:
1515 self
.fail("exception not raised")
1516 if hasattr(p
, 'set_ionice'):
1517 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_ionice
, 2)
1518 self
.assertRaises(psutil
.NoSuchProcess
, p
.send_signal
, signal
.SIGTERM
)
1519 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_nice
, 0)
1520 self
.assertFalse(p
.is_running())
1521 if hasattr(p
, "set_cpu_affinity"):
1522 self
.assertRaises(psutil
.NoSuchProcess
, p
.set_cpu_affinity
, [0])
1524 def test__str__(self
):
1525 sproc
= get_test_subprocess()
1526 p
= psutil
.Process(sproc
.pid
)
1527 self
.assertIn(str(sproc
.pid
), str(p
))
1528 # python shows up as 'Python' in cmdline on OS X so test fails on OS X
1530 self
.assertIn(os
.path
.basename(PYTHON
), str(p
))
1531 sproc
= get_test_subprocess()
1532 p
= psutil
.Process(sproc
.pid
)
1535 self
.assertIn(str(sproc
.pid
), str(p
))
1536 self
.assertIn("terminated", str(p
))
1539 def test_pid_0(self
):
1540 # Process(0) is supposed to work on all platforms except Linux
1541 p
= psutil
.Process(0)
1542 self
.assertTrue(p
.name
)
1544 if os
.name
== 'posix':
1545 self
.assertEqual(p
.uids
.real
, 0)
1546 self
.assertEqual(p
.gids
.real
, 0)
1548 self
.assertIn(p
.ppid
, (0, 1))
1549 #self.assertEqual(p.exe, "")
1550 self
.assertEqual(p
.cmdline
, [])
1553 except psutil
.AccessDenied
:
1558 except psutil
.AccessDenied
:
1563 self
.assertEqual(p
.username
, 'root')
1565 self
.assertEqual(p
.username
, 'NT AUTHORITY\\SYSTEM')
1569 self
.assertIn(0, psutil
.get_pid_list())
1570 self
.assertTrue(psutil
.pid_exists(0))
1572 def test_Popen(self
):
1574 # XXX this test causes a ResourceWarning on Python 3 because
1575 # psutil.__subproc instance doesn't get propertly freed.
1576 # Not sure what to do though.
1577 cmd
= [PYTHON
, "-c", "import time; time.sleep(3600);"]
1578 proc
= psutil
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
1582 self
.assertTrue(hasattr(proc
, 'name'))
1583 self
.assertTrue(hasattr(proc
, 'stdin'))
1584 self
.assertRaises(AttributeError, getattr, proc
, 'foo')
1590 class TestFetchAllProcesses(unittest
.TestCase
):
1591 # Iterates over all running processes and performs some sanity
1592 # checks against Process API's returned values.
1594 # Python 2.4 compatibility
1595 if not hasattr(unittest
.TestCase
, "assertIn"):
1596 def assertIn(self
, member
, container
, msg
=None):
1597 if member
not in container
:
1599 '%s not found in %s' % (repr(member
), repr(container
)))
1604 pall
= pwd
.getpwall()
1605 self
._uids
= set([x
.pw_uid
for x
in pall
])
1606 self
._usernames
= set([x
.pw_name
for x
in pall
])
1608 def test_fetch_all(self
):
1610 excluded_names
= ['send_signal', 'suspend', 'resume', 'terminate',
1611 'kill', 'wait', 'as_dict', 'get_cpu_percent', 'nice',
1612 'parent', 'get_children', 'pid']
1614 for name
in dir(psutil
.Process
):
1615 if name
.startswith("_"):
1617 if name
.startswith("set_"):
1619 if name
in excluded_names
:
1623 for p
in psutil
.process_iter():
1627 attr
= getattr(p
, name
, None)
1628 if attr
is not None and callable(attr
):
1633 except (psutil
.NoSuchProcess
, psutil
.AccessDenied
):
1634 err
= sys
.exc_info()[1]
1635 self
.assertEqual(err
.pid
, p
.pid
)
1637 # make sure exception's name attr is set
1638 # with the actual process name
1639 self
.assertEqual(err
.name
, p
.name
)
1640 self
.assertTrue(str(err
))
1641 self
.assertTrue(err
.msg
)
1643 if ret
not in (0, 0.0, [], None, ''):
1645 meth
= getattr(self
, name
)
1648 err
= sys
.exc_info()[1]
1649 trace
= traceback
.format_exc()
1650 self
.fail('%s\nproc=%s, method=%r, retvalue=%r'
1651 % (trace
, p
, name
, ret
))
1653 # we should always have a non-empty list, not including PID 0 etc.
1655 self
.assertTrue(valid_procs
> 0)
1657 def cmdline(self
, ret
):
1664 assert os
.path
.isabs(ret
), ret
1665 # Note: os.stat() may return False even if the file is there
1666 # hence we skip the test, see:
1667 # http://stackoverflow.com/questions/3112546/os-path-exists-lies
1669 assert os
.path
.isfile(ret
), ret
1670 if hasattr(os
, 'access') and hasattr(os
, "X_OK"):
1671 self
.assertTrue(os
.access(ret
, os
.X_OK
))
1673 def ppid(self
, ret
):
1674 self
.assertTrue(ret
>= 0)
1676 def name(self
, ret
):
1677 self
.assertTrue(isinstance(ret
, str))
1678 self
.assertTrue(ret
)
1680 def create_time(self
, ret
):
1681 self
.assertTrue(ret
> 0)
1683 self
.assertTrue(ret
>= psutil
.BOOT_TIME
)
1684 # make sure returned value can be pretty printed
1686 time
.strftime("%Y %m %d %H:%M:%S", time
.localtime(ret
))
1688 def uids(self
, ret
):
1690 self
.assertTrue(uid
>= 0)
1691 self
.assertIn(uid
, self
._uids
)
1693 def gids(self
, ret
):
1694 # note: testing all gids as above seems not to be reliable for
1695 # gid == 30 (nodoby); not sure why.
1697 self
.assertTrue(gid
>= 0)
1698 #self.assertIn(uid, self.gids)
1700 def username(self
, ret
):
1701 self
.assertTrue(ret
)
1702 if os
.name
== 'posix':
1703 self
.assertIn(ret
, self
._usernames
)
1705 def status(self
, ret
):
1706 self
.assertTrue(ret
>= 0)
1707 self
.assertTrue(str(ret
) != '?')
1709 def get_io_counters(self
, ret
):
1712 self
.assertTrue(field
>= 0)
1714 def get_ionice(self
, ret
):
1715 self
.assertTrue(ret
.ioclass
>= 0)
1716 self
.assertTrue(ret
.value
>= 0)
1718 def get_num_threads(self
, ret
):
1719 self
.assertTrue(ret
>= 1)
1721 def get_threads(self
, ret
):
1723 self
.assertTrue(t
.id >= 0)
1724 self
.assertTrue(t
.user_time
>= 0)
1725 self
.assertTrue(t
.system_time
>= 0)
1727 def get_cpu_times(self
, ret
):
1728 self
.assertTrue(ret
.user
>= 0)
1729 self
.assertTrue(ret
.system
>= 0)
1731 def get_memory_info(self
, ret
):
1732 self
.assertTrue(ret
.rss
>= 0)
1733 self
.assertTrue(ret
.vms
>= 0)
1735 def get_ext_memory_info(self
, ret
):
1736 for name
in ret
._fields
:
1737 self
.assertTrue(getattr(ret
, name
) >= 0)
1738 if POSIX
and ret
.vms
!= 0:
1739 # VMS is always supposed to be the highest
1740 for name
in ret
._fields
:
1742 value
= getattr(ret
, name
)
1743 assert ret
.vms
> value
, ret
1745 assert ret
.peak_wset
>= ret
.wset
, ret
1746 assert ret
.peak_paged_pool
>= ret
.paged_pool
, ret
1747 assert ret
.peak_nonpaged_pool
>= ret
.nonpaged_pool
, ret
1748 assert ret
.peak_pagefile
>= ret
.pagefile
, ret
1750 def get_open_files(self
, ret
):
1753 assert f
.fd
== -1, f
1755 assert isinstance(f
.fd
, int), f
1756 assert os
.path
.isabs(f
.path
), f
1757 assert os
.path
.isfile(f
.path
), f
1759 def get_num_fds(self
, ret
):
1760 self
.assertTrue(ret
>= 0)
1762 def get_connections(self
, ret
):
1763 # all values are supposed to match Linux's tcp_states.h states
1764 # table across all platforms.
1765 valid_conn_states
= ["ESTABLISHED", "SYN_SENT", "SYN_RECV", "FIN_WAIT1",
1766 "FIN_WAIT2", "TIME_WAIT", "CLOSE", "CLOSE_WAIT",
1767 "LAST_ACK", "LISTEN", "CLOSING", ""]
1769 self
.assertIn(conn
.type, (socket
.SOCK_STREAM
, socket
.SOCK_DGRAM
))
1770 self
.assertIn(conn
.family
, (socket
.AF_INET
, socket
.AF_INET6
))
1771 check_ip_address(conn
.local_address
, conn
.family
)
1772 check_ip_address(conn
.remote_address
, conn
.family
)
1773 if conn
.status
not in valid_conn_states
:
1774 self
.fail("%s is not a valid status" %conn
.status
)
1775 # actually try to bind the local socket; ignore IPv6
1776 # sockets as their address might be represented as
1777 # an IPv4-mapped-address (e.g. "::127.0.0.1")
1778 # and that's rejected by bind()
1779 if conn
.family
== socket
.AF_INET
:
1780 s
= socket
.socket(conn
.family
, conn
.type)
1781 s
.bind((conn
.local_address
[0], 0))
1784 if not WINDOWS
and hasattr(socket
, 'fromfd'):
1788 dupsock
= socket
.fromfd(conn
.fd
, conn
.family
, conn
.type)
1789 except (socket
.error
, OSError):
1790 err
= sys
.exc_info()[1]
1791 if err
.args
[0] == errno
.EBADF
:
1795 if hasattr(dupsock
, "family"):
1796 self
.assertEqual(dupsock
.family
, conn
.family
)
1797 self
.assertEqual(dupsock
.type, conn
.type)
1799 if dupsock
is not None:
1802 def getcwd(self
, ret
):
1803 if ret
is not None: # BSD may return None
1804 assert os
.path
.isabs(ret
), ret
1808 err
= sys
.exc_info()[1]
1809 # directory has been removed in mean time
1810 if err
.errno
!= errno
.ENOENT
:
1813 self
.assertTrue(stat
.S_ISDIR(st
.st_mode
))
1815 def get_memory_percent(self
, ret
):
1816 assert 0 <= ret
<= 100, ret
1818 def is_running(self
, ret
):
1819 self
.assertTrue(ret
)
1821 def get_cpu_affinity(self
, ret
):
1822 assert ret
!= [], ret
1824 def terminal(self
, ret
):
1826 assert os
.path
.isabs(ret
), ret
1827 assert os
.path
.exists(ret
), ret
1829 def get_memory_maps(self
, ret
):
1831 for fname
in nt
._fields
:
1832 value
= getattr(nt
, fname
)
1834 if not value
.startswith('['):
1835 assert os
.path
.isabs(nt
.path
), nt
.path
1836 # commented as on Linux we might get '/foo/bar (deleted)'
1837 #assert os.path.exists(nt.path), nt.path
1838 elif fname
in ('addr', 'perms'):
1839 self
.assertTrue(value
)
1841 assert isinstance(value
, (int, long))
1842 assert value
>= 0, value
1844 def get_num_handles(self
, ret
):
1850 def get_nice(self
, ret
):
1852 assert -20 <= ret
<= 20, ret
1854 priorities
= [getattr(psutil
, x
) for x
in dir(psutil
)
1855 if x
.endswith('_PRIORITY_CLASS')]
1856 self
.assertIn(ret
, priorities
)
1858 def get_num_ctx_switches(self
, ret
):
1859 self
.assertTrue(ret
.voluntary
>= 0)
1860 self
.assertTrue(ret
.involuntary
>= 0)
1862 if hasattr(os
, 'getuid'):
1863 class LimitedUserTestCase(TestCase
):
1864 """Repeat the previous tests by using a limited user.
1865 Executed only on UNIX and only if the user who run the test script
1868 # the uid/gid the test suite runs under
1869 PROCESS_UID
= os
.getuid()
1870 PROCESS_GID
= os
.getgid()
1872 def __init__(self
, *args
, **kwargs
):
1873 TestCase
.__init
__(self
, *args
, **kwargs
)
1874 # re-define all existent test methods in order to
1875 # ignore AccessDenied exceptions
1876 for attr
in [x
for x
in dir(self
) if x
.startswith('test')]:
1877 meth
= getattr(self
, attr
)
1881 except psutil
.AccessDenied
:
1883 setattr(self
, attr
, types
.MethodType(test_
, self
))
1888 TestCase
.setUp(self
)
1891 os
.setegid(self
.PROCESS_UID
)
1892 os
.seteuid(self
.PROCESS_GID
)
1893 TestCase
.tearDown(self
)
1895 def test_nice(self
):
1897 psutil
.Process(os
.getpid()).set_nice(-1)
1898 except psutil
.AccessDenied
:
1901 self
.fail("exception not raised")
1905 reap_children(search_all
=True)
1909 atexit
.register(cleanup
)
1913 test_suite
= unittest
.TestSuite()
1914 tests
.append(TestCase
)
1915 tests
.append(TestFetchAllProcesses
)
1918 from _posix
import PosixSpecificTestCase
1919 tests
.append(PosixSpecificTestCase
)
1921 # import the specific platform test suite
1923 from _linux
import LinuxSpecificTestCase
as stc
1925 from _windows
import WindowsSpecificTestCase
as stc
1926 from _windows
import TestDualProcessImplementation
1927 tests
.append(TestDualProcessImplementation
)
1929 from _osx
import OSXSpecificTestCase
as stc
1931 from _bsd
import BSDSpecificTestCase
as stc
1934 if hasattr(os
, 'getuid'):
1935 if os
.getuid() == 0:
1936 tests
.append(LimitedUserTestCase
)
1938 atexit
.register(warn
, "Couldn't run limited user tests ("
1939 "super-user privileges are required)")
1941 for test_class
in tests
:
1942 test_suite
.addTest(unittest
.makeSuite(test_class
))
1943 unittest
.TextTestRunner(verbosity
=2).run(test_suite
)
1945 if __name__
== '__main__':