diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index 77a90a45e..28d1556ec 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -1312,20 +1312,25 @@ def print_section(section, info): print_section( "ps aux", subprocess.check_output(["ps", "aux"]).decode()) + with open("/proc/self/mountinfo") as f: + data = f.read() + print_section("mountinfo 1", data) + print_section("mountinfo 2", repr(data) + print("=" * 70, file=sys.stderr) # NOQA sys.stdout.flush() def is_win_secure_system_proc(pid): # see: https://github.com/giampaolo/psutil/issues/2338 - @memoize + @ memoize def get_procs(): - ret = {} - out = sh("tasklist.exe /NH /FO csv") + ret={} + out=sh("tasklist.exe /NH /FO csv") for line in out.splitlines()[1:]: - bits = [x.replace('"', "") for x in line.split(",")] - name, pid = bits[0], int(bits[1]) - ret[pid] = name + bits=[x.replace('"', "") for x in line.split(",")] + name, pid=bits[0], int(bits[1]) + ret[pid]=name return ret try: @@ -1335,7 +1340,7 @@ def get_procs(): def _get_eligible_cpu(): - p = psutil.Process() + p=psutil.Process() if hasattr(p, "cpu_num"): return p.cpu_num() elif hasattr(p, "cpu_affinity"): @@ -1353,12 +1358,12 @@ class process_namespace: ... fun() """ - utils = [ + utils=[ ('cpu_percent', (), {}), ('memory_percent', (), {}), ] - ignored = [ + ignored=[ ('as_dict', (), {}), ('children', (), {'recursive': True}), ('is_running', (), {}), @@ -1370,7 +1375,7 @@ class process_namespace: ('wait', (0, ), {}), ] - getters = [ + getters=[ ('cmdline', (), {}), ('connections', (), {'kind': 'all'}), ('cpu_times', (), {}), @@ -1411,7 +1416,7 @@ class process_namespace: if HAS_MEMORY_MAPS: getters += [('memory_maps', (), {'grouped': False})] - setters = [] + setters=[] if POSIX: setters += [('nice', (0, ), {})] else: @@ -1426,7 +1431,7 @@ class process_namespace: if HAS_CPU_AFFINITY: setters += [('cpu_affinity', ([_get_eligible_cpu()], ), {})] - killers = [ + killers=[ ('send_signal', (signal.SIGTERM, ), {}), ('suspend', (), {}), ('resume', (), {}), @@ -1437,46 +1442,46 @@ class process_namespace: killers += [('send_signal', (signal.CTRL_C_EVENT, ), {})] killers += [('send_signal', (signal.CTRL_BREAK_EVENT, ), {})] - all = utils + getters + setters + killers + all=utils + getters + setters + killers def __init__(self, proc): - self._proc = proc + self._proc=proc def iter(self, ls, clear_cache=True): """Given a list of tuples yields a set of (fun, fun_name) tuples in random order. """ - ls = list(ls) + ls=list(ls) random.shuffle(ls) for fun_name, args, kwds in ls: if clear_cache: self.clear_cache() - fun = getattr(self._proc, fun_name) - fun = functools.partial(fun, *args, **kwds) + fun=getattr(self._proc, fun_name) + fun=functools.partial(fun, *args, **kwds) yield (fun, fun_name) def clear_cache(self): """Clear the cache of a Process instance.""" self._proc._init(self._proc.pid, _ignore_nsp=True) - @classmethod + @ classmethod def test_class_coverage(cls, test_class, ls): """Given a TestCase instance and a list of tuples checks that the class defines the required test method names. """ for fun_name, _, _ in ls: - meth_name = 'test_' + fun_name + meth_name='test_' + fun_name if not hasattr(test_class, meth_name): - msg = "%r class should define a '%s' method" % ( + msg="%r class should define a '%s' method" % ( test_class.__class__.__name__, meth_name) raise AttributeError(msg) - @classmethod + @ classmethod def test(cls): - this = set([x[0] for x in cls.all]) - ignored = set([x[0] for x in cls.ignored]) - klass = set([x for x in dir(psutil.Process) if x[0] != '_']) - leftout = (this | ignored) ^ klass + this=set([x[0] for x in cls.all]) + ignored=set([x[0] for x in cls.ignored]) + klass=set([x for x in dir(psutil.Process) if x[0] != '_']) + leftout=(this | ignored) ^ klass if leftout: raise ValueError("uncovered Process class names: %r" % leftout) @@ -1490,7 +1495,7 @@ class system_namespace: ... fun() """ - getters = [ + getters=[ ('boot_time', (), {}), ('cpu_count', (), {'logical': False}), ('cpu_count', (), {'logical': True}), @@ -1524,28 +1529,28 @@ class system_namespace: getters += [('win_service_iter', (), {})] getters += [('win_service_get', ('alg', ), {})] - ignored = [ + ignored=[ ('process_iter', (), {}), ('wait_procs', ([psutil.Process()], ), {}), ('cpu_percent', (), {}), ('cpu_times_percent', (), {}), ] - all = getters + all=getters - @staticmethod + @ staticmethod def iter(ls): """Given a list of tuples yields a set of (fun, fun_name) tuples in random order. """ - ls = list(ls) + ls=list(ls) random.shuffle(ls) for fun_name, args, kwds in ls: - fun = getattr(psutil, fun_name) - fun = functools.partial(fun, *args, **kwds) + fun=getattr(psutil, fun_name) + fun=functools.partial(fun, *args, **kwds) yield (fun, fun_name) - test_class_coverage = process_namespace.test_class_coverage + test_class_coverage=process_namespace.test_class_coverage def serialrun(klass): @@ -1554,7 +1559,7 @@ def serialrun(klass): """ # assert issubclass(klass, unittest.TestCase), klass assert inspect.isclass(klass), klass - klass._serialrun = True + klass._serialrun=True return klass @@ -1572,7 +1577,7 @@ def logfun(exc): def skip_on_access_denied(only_if=None): """Decorator to Ignore AccessDenied exceptions.""" def decorator(fun): - @functools.wraps(fun) + @ functools.wraps(fun) def wrapper(*args, **kwargs): try: return fun(*args, **kwargs) @@ -1588,7 +1593,7 @@ def wrapper(*args, **kwargs): def skip_on_not_implemented(only_if=None): """Decorator to Ignore NotImplementedError exceptions.""" def decorator(fun): - @functools.wraps(fun) + @ functools.wraps(fun) def wrapper(*args, **kwargs): try: return fun(*args, **kwargs) @@ -1596,7 +1601,7 @@ def wrapper(*args, **kwargs): if only_if is not None: if not only_if: raise - msg = "%r was skipped because it raised NotImplementedError" \ + msg="%r was skipped because it raised NotImplementedError" \ % fun.__name__ raise unittest.SkipTest(msg) return wrapper @@ -1619,8 +1624,8 @@ def get_free_port(host='127.0.0.1'): def bind_socket(family=AF_INET, type=SOCK_STREAM, addr=None): """Binds a generic socket.""" if addr is None and family in (AF_INET, AF_INET6): - addr = ("", 0) - sock = socket.socket(family, type) + addr=("", 0) + sock=socket.socket(family, type) try: if os.name not in ('nt', 'cygwin'): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -1637,7 +1642,7 @@ def bind_unix_socket(name, type=socket.SOCK_STREAM): """Bind a UNIX socket.""" assert psutil.POSIX assert not os.path.exists(name), name - sock = socket.socket(socket.AF_UNIX, type) + sock=socket.socket(socket.AF_UNIX, type) try: sock.bind(name) if type == socket.SOCK_STREAM: @@ -1655,13 +1660,13 @@ def tcp_socketpair(family, addr=("", 0)): with contextlib.closing(socket.socket(family, SOCK_STREAM)) as ll: ll.bind(addr) ll.listen(5) - addr = ll.getsockname() - c = socket.socket(family, SOCK_STREAM) + addr=ll.getsockname() + c=socket.socket(family, SOCK_STREAM) try: c.connect(addr) - caddr = c.getsockname() + caddr=c.getsockname() while True: - a, addr = ll.accept() + a, addr=ll.accept() # check that we've got the correct client if addr == caddr: return (a, c) @@ -1677,11 +1682,11 @@ def unix_socketpair(name): Return a (server, client) tuple. """ assert psutil.POSIX - server = client = None + server=client=None try: - server = bind_unix_socket(name, type=socket.SOCK_STREAM) + server=bind_unix_socket(name, type=socket.SOCK_STREAM) server.setblocking(0) - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) client.setblocking(0) client.connect(name) # new = server.accept() @@ -1694,11 +1699,11 @@ def unix_socketpair(name): return (server, client) -@contextlib.contextmanager +@ contextlib.contextmanager def create_sockets(): """Open as many socket families / types as possible.""" - socks = [] - fname1 = fname2 = None + socks=[] + fname1=fname2=None try: socks.append(bind_socket(socket.AF_INET, socket.SOCK_STREAM)) socks.append(bind_socket(socket.AF_INET, socket.SOCK_DGRAM)) @@ -1706,10 +1711,10 @@ def create_sockets(): socks.append(bind_socket(socket.AF_INET6, socket.SOCK_STREAM)) socks.append(bind_socket(socket.AF_INET6, socket.SOCK_DGRAM)) if POSIX and HAS_CONNECTIONS_UNIX: - fname1 = get_testfn() - fname2 = get_testfn() - s1, s2 = unix_socketpair(fname1) - s3 = bind_unix_socket(fname2, type=socket.SOCK_DGRAM) + fname1=get_testfn() + fname2=get_testfn() + s1, s2=unix_socketpair(fname1) + s3=bind_unix_socket(fname2, type=socket.SOCK_DGRAM) for s in (s1, s2, s3): socks.append(s) yield socks @@ -1729,17 +1734,17 @@ def check_net_address(addr, family): if enum and PY3 and not PYPY: assert isinstance(family, enum.IntEnum), family if family == socket.AF_INET: - octs = [int(x) for x in addr.split('.')] + octs=[int(x) for x in addr.split('.')] assert len(octs) == 4, addr for num in octs: assert 0 <= num <= 255, addr if not PY3: - addr = unicode(addr) + addr=unicode(addr) ipaddress.IPv4Address(addr) elif family == socket.AF_INET6: assert isinstance(addr, str), addr if not PY3: - addr = unicode(addr) + addr=unicode(addr) ipaddress.IPv6Address(addr) elif family == psutil.AF_LINK: assert re.match(r'([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr @@ -1750,7 +1755,7 @@ def check_net_address(addr, family): def check_connection_ntuple(conn): """Check validity of a connection namedtuple.""" def check_ntuple(conn): - has_pid = len(conn) == 7 + has_pid=len(conn) == 7 assert len(conn) in (6, 7), len(conn) assert conn[0] == conn.fd, conn.fd assert conn[1] == conn.family, conn.family @@ -1772,7 +1777,7 @@ def check_family(conn): # sockets as their address might be represented as # an IPv4-mapped-address (e.g. "::127.0.0.1") # and that's rejected by bind() - s = socket.socket(conn.family, conn.type) + s=socket.socket(conn.family, conn.type) with contextlib.closing(s): try: s.bind((conn.laddr[0], 0)) @@ -1784,7 +1789,7 @@ def check_family(conn): def check_type(conn): # SOCK_SEQPACKET may happen in case of AF_UNIX socks - SOCK_SEQPACKET = getattr(socket, "SOCK_SEQPACKET", object()) + SOCK_SEQPACKET=getattr(socket, "SOCK_SEQPACKET", object()) assert conn.type in (socket.SOCK_STREAM, socket.SOCK_DGRAM, SOCK_SEQPACKET), conn.type if enum is not None: @@ -1809,7 +1814,7 @@ def check_addrs(conn): def check_status(conn): assert isinstance(conn.status, str), conn.status - valids = [getattr(psutil, x) for x in dir(psutil) + valids=[getattr(psutil, x) for x in dir(psutil) if x.startswith('CONN_')] assert conn.status in valids, conn.status if conn.family in (AF_INET, AF_INET6) and conn.type == SOCK_STREAM: @@ -1843,14 +1848,14 @@ def reload_module(module): def import_module_by_path(path): - name = os.path.splitext(os.path.basename(path))[0] + name=os.path.splitext(os.path.basename(path))[0] if sys.version_info[0] < 3: import imp return imp.load_source(name, path) else: import importlib.util - spec = importlib.util.spec_from_file_location(name, path) - mod = importlib.util.module_from_spec(spec) + spec=importlib.util.spec_from_file_location(name, path) + mod=importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod @@ -1867,30 +1872,30 @@ def warn(msg): def is_namedtuple(x): """Check if object is an instance of namedtuple.""" - t = type(x) - b = t.__bases__ + t=type(x) + b=t.__bases__ if len(b) != 1 or b[0] != tuple: return False - f = getattr(t, '_fields', None) + f=getattr(t, '_fields', None) if not isinstance(f, tuple): return False return all(isinstance(n, str) for n in f) if POSIX: - @contextlib.contextmanager + @ contextlib.contextmanager def copyload_shared_lib(suffix=""): """Ctx manager which picks up a random shared CO lib used by this process, copies it in another location and loads it in memory via ctypes. Return the new absolutized path. """ - exe = 'pypy' if PYPY else 'python' - ext = ".so" - dst = get_testfn(suffix=suffix + ext) - libs = [x.path for x in psutil.Process().memory_maps() if + exe='pypy' if PYPY else 'python' + ext=".so" + dst=get_testfn(suffix=suffix + ext) + libs=[x.path for x in psutil.Process().memory_maps() if os.path.splitext(x.path)[1] == ext and exe in x.path.lower()] - src = random.choice(libs) + src=random.choice(libs) shutil.copyfile(src, dst) try: ctypes.CDLL(dst) @@ -1898,7 +1903,7 @@ def copyload_shared_lib(suffix=""): finally: safe_rmpath(dst) else: - @contextlib.contextmanager + @ contextlib.contextmanager def copyload_shared_lib(suffix=""): """Ctx manager which picks up a random shared DLL lib used by this process, copies it in another location and loads it @@ -1907,20 +1912,20 @@ def copyload_shared_lib(suffix=""): """ from ctypes import WinError from ctypes import wintypes - ext = ".dll" - dst = get_testfn(suffix=suffix + ext) - libs = [x.path for x in psutil.Process().memory_maps() if + ext=".dll" + dst=get_testfn(suffix=suffix + ext) + libs=[x.path for x in psutil.Process().memory_maps() if x.path.lower().endswith(ext) and 'python' in os.path.basename(x.path).lower() and 'wow64' not in x.path.lower()] if PYPY and not libs: - libs = [x.path for x in psutil.Process().memory_maps() if + libs=[x.path for x in psutil.Process().memory_maps() if 'pypy' in os.path.basename(x.path).lower()] - src = random.choice(libs) + src=random.choice(libs) shutil.copyfile(src, dst) - cfile = None + cfile=None try: - cfile = ctypes.WinDLL(dst) + cfile=ctypes.WinDLL(dst) yield dst finally: # Work around OverflowError: @@ -1929,9 +1934,9 @@ def copyload_shared_lib(suffix=""): # - http://bugs.python.org/issue30286 # - http://stackoverflow.com/questions/23522055 if cfile is not None: - FreeLibrary = ctypes.windll.kernel32.FreeLibrary - FreeLibrary.argtypes = [wintypes.HMODULE] - ret = FreeLibrary(cfile._handle) + FreeLibrary=ctypes.windll.kernel32.FreeLibrary + FreeLibrary.argtypes=[wintypes.HMODULE] + ret=FreeLibrary(cfile._handle) if ret == 0: WinError() safe_rmpath(dst) @@ -1943,7 +1948,7 @@ def copyload_shared_lib(suffix=""): # this is executed first -@atexit.register +@ atexit.register def cleanup_test_procs(): reap_children(recursive=True)