From 1153176f4f31bb07f3b1e48ae99fc6d91a43b865 Mon Sep 17 00:00:00 2001 From: Benedek Racz Date: Mon, 6 Apr 2020 13:07:49 +0200 Subject: [PATCH] [TST] Improove code coverity --- tests/test_destructor.py | 12 +++ tests/test_misc.py | 9 ++- tests/test_parametric_printer_coverage.py | 69 +++++++++++++++++ tests/test_timeout_pattern.py | 94 +++++++++++++++++++++++ wexpect/__main__.py | 12 +-- wexpect/console_reader.py | 5 +- wexpect/host.py | 21 ++--- wexpect/wexpect_util.py | 19 ++++- 8 files changed, 213 insertions(+), 28 deletions(-) create mode 100644 tests/test_parametric_printer_coverage.py create mode 100644 tests/test_timeout_pattern.py diff --git a/tests/test_destructor.py b/tests/test_destructor.py index de32ea1..2976f7d 100644 --- a/tests/test_destructor.py +++ b/tests/test_destructor.py @@ -52,6 +52,18 @@ class TestCaseDestructor(PexpectTestCase.PexpectTestCase): p3 = wexpect.spawn('ls', port=4323) p4 = wexpect.spawn('ls', port=4324) + @unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported") + def test_failed_termination(self): + "Test failed termination." + child = wexpect.spawn('cat') + delayafterterminate_orig = child.delayafterterminate + child.delayafterterminate =.1 + + msg = 'Child has not been terminated even after it was killed.' + with self.assertRaisesRegex(wexpect.ExceptionPexpect, msg): + child.terminate() + + child.isalive(timeout = delayafterterminate_orig) if __name__ == '__main__': unittest.main() diff --git a/tests/test_misc.py b/tests/test_misc.py index b253f37..280e65f 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -17,13 +17,14 @@ PYBIN = '"{}"'.format(sys.executable) class TestCaseMisc(PexpectTestCase.PexpectTestCase): + def test_wrong_command(self): + "Test raise of exception in case of wrong program" + with self.assertRaisesRegex(wexpect.ExceptionPexpect,"The command was not found.+"): + child = wexpect.spawn('unexecutable') + def test_isatty(self): " Test isatty() is True after spawning process on most platforms. " child = wexpect.spawn('cat') - if not child.isatty() and sys.platform.lower().startswith('sunos'): - if hasattr(unittest, 'SkipTest'): - raise unittest.SkipTest("Not supported on this platform.") - return 'skip' assert child.isatty() def test_read(self): diff --git a/tests/test_parametric_printer_coverage.py b/tests/test_parametric_printer_coverage.py new file mode 100644 index 0000000..f725775 --- /dev/null +++ b/tests/test_parametric_printer_coverage.py @@ -0,0 +1,69 @@ +import wexpect +import unittest +import sys +import os +import time +from tests import PexpectTestCase + +@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported") +class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase): + def test_all_line_length (self): + + here = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, here) + + # With quotes (C:\Program Files\Python37\python.exe needs quotes) + python_executable = '"' + sys.executable + '" ' + child_script = here + '\\parametric_printer.py' + + self.prompt = '> ' + + # Start the child process + self.p = wexpect.spawn(python_executable + ' ' + child_script, coverage_console_reader=True) + # Wait for prompt + self.p.expect(self.prompt) + + self._test(['a'], range(1,200), [1], [0]) + + self.p.terminate() + + def test_long_console(self): + + here = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, here) + + # With quotes (C:\Program Files\Python37\python.exe needs quotes) + python_executable = '"' + sys.executable + '" ' + child_script = here + '\\parametric_printer.py' + + self.prompt = '> ' + + # Start the child process + self.p = wexpect.spawn(python_executable + ' ' + child_script, coverage_console_reader=True) + # Wait for prompt + self.p.expect(self.prompt) + + self._test(['a', 'b', 'c', 'd', 'e', 'f'], [8, 16, 32, 64], [64, 128, 256], [-1, 0]) + + self.p.terminate() + + def _test(self, character_list, character_count_list, line_count_list, speed_ms_list): + + # print(f'character_list: {character_list} character_count_list: {character_count_list} line_count_list: {line_count_list} speed_ms_list: {speed_ms_list}') + for character in character_list: + for character_count in character_count_list: + for line_count in line_count_list: + for speed_ms in speed_ms_list: + command = f'{character},{character_count},{line_count},{speed_ms}' + self.p.sendline(command) + self.p.expect(self.prompt) + expected = [character*character_count] * line_count + try: + self.assertEqual(self.p.before.splitlines()[1:-1], expected) + except: + raise + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(TestCaseParametricPrinter,'test') diff --git a/tests/test_timeout_pattern.py b/tests/test_timeout_pattern.py new file mode 100644 index 0000000..01d2bd0 --- /dev/null +++ b/tests/test_timeout_pattern.py @@ -0,0 +1,94 @@ +import multiprocessing +import unittest +import subprocess +import time +import signal +import sys +import os + +import wexpect +from tests import PexpectTestCase + +@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported") +class Exp_TimeoutTestCase(PexpectTestCase.PexpectTestCase): + def test_matches_exp_timeout (self): + '''This tests that we can raise and catch TIMEOUT. + ''' + try: + raise wexpect.TIMEOUT("TIMEOUT match test") + except wexpect.TIMEOUT: + pass + #print "Correctly caught TIMEOUT when raising TIMEOUT." + else: + self.fail('TIMEOUT not caught by an except TIMEOUT clause.') + + def test_pattern_printout (self): + '''Verify that a TIMEOUT returns the proper patterns it is trying to match against. + Make sure it is returning the pattern from the correct call.''' + try: + p = wexpect.spawn('cat') + p.sendline('Hello') + p.expect('Hello') + p.expect('Goodbye',timeout=5) + except wexpect.TIMEOUT: + assert p.match_index == None + else: + self.fail("Did not generate a TIMEOUT exception.") + + # Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test. + if wexpect.spawn_class_name == 'SpawnSocket': + p.terminate() + + def test_exp_timeout_notThrown (self): + '''Verify that a TIMEOUT is not thrown when we match what we expect.''' + try: + p = wexpect.spawn('cat') + p.sendline('Hello') + p.expect('Hello') + except wexpect.TIMEOUT: + self.fail("TIMEOUT caught when it shouldn't be raised because we match the proper pattern.") + + # Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test. + if wexpect.spawn_class_name == 'SpawnSocket': + p.terminate() + + def test_stacktraceMunging (self): + '''Verify that the stack trace returned with a TIMEOUT instance does not contain references to wexpect.''' + try: + p = wexpect.spawn('cat') + p.sendline('Hello') + p.expect('Goodbye',timeout=5) + except wexpect.TIMEOUT: + err = sys.exc_info()[1] + if err.get_trace().count("wexpect/__init__.py") != 0: + self.fail("The TIMEOUT get_trace() referenced wexpect.py. " + "It should only reference the caller.\n" + err.get_trace()) + + # Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test. + if wexpect.spawn_class_name == 'SpawnSocket': + p.terminate() + + def test_correctStackTrace (self): + '''Verify that the stack trace returned with a TIMEOUT instance correctly handles function calls.''' + def nestedFunction (spawnInstance): + spawnInstance.expect("junk", timeout=3) + + try: + p = wexpect.spawn('cat') + p.sendline('Hello') + nestedFunction(p) + except wexpect.TIMEOUT: + err = sys.exc_info()[1] + if err.get_trace().count("nestedFunction") == 0: + self.fail("The TIMEOUT get_trace() did not show the call " + "to the nestedFunction function.\n" + str(err) + "\n" + + err.get_trace()) + + # Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test. + if wexpect.spawn_class_name == 'SpawnSocket': + p.terminate() + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(Exp_TimeoutTestCase,'test') diff --git a/wexpect/__main__.py b/wexpect/__main__.py index 48f461b..13e858f 100644 --- a/wexpect/__main__.py +++ b/wexpect/__main__.py @@ -10,16 +10,6 @@ import wexpect.wexpect_util as wexpect_util logger = logging.getLogger('wexpect') logger.info('Hello') -def str2bool(v): - if isinstance(v, bool): - return v - if v.lower() in ('yes', 'true', 't', 'y', '1'): - return True - elif v.lower() in ('no', 'false', 'f', 'n', '0'): - return False - else: - raise argparse.ArgumentTypeError('Boolean value expected.') - def main(): try: parser = argparse.ArgumentParser(description='Wexpect: executable automation for Windows.') @@ -61,7 +51,7 @@ def main(): path=command, host_pid=args.host_pid, codepage=args.codepage, port=args.port, window_size_x=args.window_size_x, window_size_y=args.window_size_y, buffer_size_x=args.buffer_size_x, buffer_size_y=args.buffer_size_y, - local_echo=str2bool(args.local_echo), interact=str2bool(args.interact)) + local_echo=wexpect_util.str2bool(args.local_echo), interact=wexpect_util.str2bool(args.interact)) logger.info(f'Exiting with status: {cons.child_exitstatus}') sys.exit(cons.child_exitstatus) diff --git a/wexpect/console_reader.py b/wexpect/console_reader.py index 2da6c1a..72dd4e0 100644 --- a/wexpect/console_reader.py +++ b/wexpect/console_reader.py @@ -463,8 +463,9 @@ class ConsoleReaderSocket(ConsoleReaderBase): self.connection, client_address = self.sock.accept() self.connection.settimeout(.01) logger.info(f'Client connected: {client_address}') - except Exception: - logger.error(f"Port: {self.port}") + except Exception as e: # pragma: no cover + # I hope this code is unreachable. + logger.error(f"Port: {self.port} {e}") raise def close_connection(self): diff --git a/wexpect/host.py b/wexpect/host.py index 0bd89f7..8a62f85 100644 --- a/wexpect/host.py +++ b/wexpect/host.py @@ -285,9 +285,10 @@ class SpawnBase: self.disconnect_from_child() if self.safe_exit: self.wait() - except Exception: - traceback.print_exc() - logger.warning(traceback.format_exc()) + except Exception: # pragma: no cover + # I hope this code is unreachable... + logger.error(traceback.format_exc()) + raise def __str__(self): """This returns a human-readable string that represents the state of @@ -354,11 +355,13 @@ class SpawnBase: # Deep copy needed to prevent cycle-to-cycle growth. See #31 for more details. environ = os.environ.copy() - if getattr(sys, 'frozen', False): + if getattr(sys, 'frozen', False): # pragma: no cover '''Runing in a PyInstaller bundle: Pyinstaller has no explicit python interpreter, so console-reader should be bundled also, and host should call it as a windows executable. + This code cannot be covered during tests, because it runs only in bundled way. + https://pyinstaller.readthedocs.io/en/stable/runtime-information.html#using-sys-executable-and-sys-argv-0 https://github.com/pyinstaller/pyinstaller/issues/822 ''' @@ -428,7 +431,7 @@ class SpawnBase: if not self.isalive(timeout = self.delayafterterminate): return True - return False + raise ExceptionPexpect("Child has not been terminated even after it was killed.") def isalive(self, trust_console=True, timeout=0): """True if the child is still alive, false otherwise""" @@ -522,7 +525,7 @@ class SpawnBase: return self - def read_nonblocking(self, size=1): + def read_nonblocking(self, size=1): # pragma: no cover """Virtual definition """ raise NotImplementedError @@ -610,17 +613,17 @@ class SpawnBase: return self._send_impl(s) - def _send_impl(self, s): + def _send_impl(self, s): # pragma: no cover """Virtual definition """ raise NotImplementedError - def connect_to_child(self): + def connect_to_child(self): # pragma: no cover """Virtual definition """ raise NotImplementedError - def disconnect_from_child(self): + def disconnect_from_child(self): # pragma: no cover """Virtual definition """ raise NotImplementedError diff --git a/wexpect/wexpect_util.py b/wexpect/wexpect_util.py index 03ebf14..7987c2d 100644 --- a/wexpect/wexpect_util.py +++ b/wexpect/wexpect_util.py @@ -27,7 +27,20 @@ SPAM = 5 logging.addLevelName(SPAM, "SPAM") -def spam(self, message, *args, **kws): +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: # pragma: no cover + raise argparse.ArgumentTypeError('Boolean value expected.') + + +def spam(self, message, *args, **kws): # pragma: no cover + '''Very verbose debug dunction. + ''' if self.isEnabledFor(SPAM): # Yes, logger takes its '*args' as 'args'. self._log(SPAM, message, args, **kws) @@ -36,7 +49,9 @@ def spam(self, message, *args, **kws): logging.Logger.spam = spam -def init_logger(logger=None): +def init_logger(logger=None): # pragma: no cover + '''Initializes the logger. I wont measure coverage for this debug method. + ''' if logger is None: logger = logging.getLogger('wexpect') try: