[TST] Improove code coverity

This commit is contained in:
Benedek Racz 2020-04-06 13:07:49 +02:00
parent 4e38fd3f4d
commit 1153176f4f
8 changed files with 213 additions and 28 deletions

View File

@ -52,6 +52,18 @@ class TestCaseDestructor(PexpectTestCase.PexpectTestCase):
p3 = wexpect.spawn('ls', port=4323)
p4 = wexpect.spawn('ls', port=4324)
@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported")
def test_failed_termination(self):
"Test failed termination."
child = wexpect.spawn('cat')
delayafterterminate_orig = child.delayafterterminate
child.delayafterterminate =.1
msg = 'Child has not been terminated even after it was killed.'
with self.assertRaisesRegex(wexpect.ExceptionPexpect, msg):
child.terminate()
child.isalive(timeout = delayafterterminate_orig)
if __name__ == '__main__':
unittest.main()

View File

@ -17,13 +17,14 @@ PYBIN = '"{}"'.format(sys.executable)
class TestCaseMisc(PexpectTestCase.PexpectTestCase):
def test_wrong_command(self):
"Test raise of exception in case of wrong program"
with self.assertRaisesRegex(wexpect.ExceptionPexpect,"The command was not found.+"):
child = wexpect.spawn('unexecutable')
def test_isatty(self):
" Test isatty() is True after spawning process on most platforms. "
child = wexpect.spawn('cat')
if not child.isatty() and sys.platform.lower().startswith('sunos'):
if hasattr(unittest, 'SkipTest'):
raise unittest.SkipTest("Not supported on this platform.")
return 'skip'
assert child.isatty()
def test_read(self):

View File

@ -0,0 +1,69 @@
import wexpect
import unittest
import sys
import os
import time
from tests import PexpectTestCase
@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported")
class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase):
def test_all_line_length (self):
here = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, here)
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
python_executable = '"' + sys.executable + '" '
child_script = here + '\\parametric_printer.py'
self.prompt = '> '
# Start the child process
self.p = wexpect.spawn(python_executable + ' ' + child_script, coverage_console_reader=True)
# Wait for prompt
self.p.expect(self.prompt)
self._test(['a'], range(1,200), [1], [0])
self.p.terminate()
def test_long_console(self):
here = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, here)
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
python_executable = '"' + sys.executable + '" '
child_script = here + '\\parametric_printer.py'
self.prompt = '> '
# Start the child process
self.p = wexpect.spawn(python_executable + ' ' + child_script, coverage_console_reader=True)
# Wait for prompt
self.p.expect(self.prompt)
self._test(['a', 'b', 'c', 'd', 'e', 'f'], [8, 16, 32, 64], [64, 128, 256], [-1, 0])
self.p.terminate()
def _test(self, character_list, character_count_list, line_count_list, speed_ms_list):
# print(f'character_list: {character_list} character_count_list: {character_count_list} line_count_list: {line_count_list} speed_ms_list: {speed_ms_list}')
for character in character_list:
for character_count in character_count_list:
for line_count in line_count_list:
for speed_ms in speed_ms_list:
command = f'{character},{character_count},{line_count},{speed_ms}'
self.p.sendline(command)
self.p.expect(self.prompt)
expected = [character*character_count] * line_count
try:
self.assertEqual(self.p.before.splitlines()[1:-1], expected)
except:
raise
if __name__ == '__main__':
unittest.main()
suite = unittest.makeSuite(TestCaseParametricPrinter,'test')

View File

@ -0,0 +1,94 @@
import multiprocessing
import unittest
import subprocess
import time
import signal
import sys
import os
import wexpect
from tests import PexpectTestCase
@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy unsupported")
class Exp_TimeoutTestCase(PexpectTestCase.PexpectTestCase):
def test_matches_exp_timeout (self):
'''This tests that we can raise and catch TIMEOUT.
'''
try:
raise wexpect.TIMEOUT("TIMEOUT match test")
except wexpect.TIMEOUT:
pass
#print "Correctly caught TIMEOUT when raising TIMEOUT."
else:
self.fail('TIMEOUT not caught by an except TIMEOUT clause.')
def test_pattern_printout (self):
'''Verify that a TIMEOUT returns the proper patterns it is trying to match against.
Make sure it is returning the pattern from the correct call.'''
try:
p = wexpect.spawn('cat')
p.sendline('Hello')
p.expect('Hello')
p.expect('Goodbye',timeout=5)
except wexpect.TIMEOUT:
assert p.match_index == None
else:
self.fail("Did not generate a TIMEOUT exception.")
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.terminate()
def test_exp_timeout_notThrown (self):
'''Verify that a TIMEOUT is not thrown when we match what we expect.'''
try:
p = wexpect.spawn('cat')
p.sendline('Hello')
p.expect('Hello')
except wexpect.TIMEOUT:
self.fail("TIMEOUT caught when it shouldn't be raised because we match the proper pattern.")
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.terminate()
def test_stacktraceMunging (self):
'''Verify that the stack trace returned with a TIMEOUT instance does not contain references to wexpect.'''
try:
p = wexpect.spawn('cat')
p.sendline('Hello')
p.expect('Goodbye',timeout=5)
except wexpect.TIMEOUT:
err = sys.exc_info()[1]
if err.get_trace().count("wexpect/__init__.py") != 0:
self.fail("The TIMEOUT get_trace() referenced wexpect.py. "
"It should only reference the caller.\n" + err.get_trace())
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.terminate()
def test_correctStackTrace (self):
'''Verify that the stack trace returned with a TIMEOUT instance correctly handles function calls.'''
def nestedFunction (spawnInstance):
spawnInstance.expect("junk", timeout=3)
try:
p = wexpect.spawn('cat')
p.sendline('Hello')
nestedFunction(p)
except wexpect.TIMEOUT:
err = sys.exc_info()[1]
if err.get_trace().count("nestedFunction") == 0:
self.fail("The TIMEOUT get_trace() did not show the call "
"to the nestedFunction function.\n" + str(err) + "\n"
+ err.get_trace())
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.terminate()
if __name__ == '__main__':
unittest.main()
suite = unittest.makeSuite(Exp_TimeoutTestCase,'test')

View File

@ -10,16 +10,6 @@ import wexpect.wexpect_util as wexpect_util
logger = logging.getLogger('wexpect')
logger.info('Hello')
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
def main():
try:
parser = argparse.ArgumentParser(description='Wexpect: executable automation for Windows.')
@ -61,7 +51,7 @@ def main():
path=command, host_pid=args.host_pid, codepage=args.codepage, port=args.port,
window_size_x=args.window_size_x, window_size_y=args.window_size_y,
buffer_size_x=args.buffer_size_x, buffer_size_y=args.buffer_size_y,
local_echo=str2bool(args.local_echo), interact=str2bool(args.interact))
local_echo=wexpect_util.str2bool(args.local_echo), interact=wexpect_util.str2bool(args.interact))
logger.info(f'Exiting with status: {cons.child_exitstatus}')
sys.exit(cons.child_exitstatus)

View File

@ -463,8 +463,9 @@ class ConsoleReaderSocket(ConsoleReaderBase):
self.connection, client_address = self.sock.accept()
self.connection.settimeout(.01)
logger.info(f'Client connected: {client_address}')
except Exception:
logger.error(f"Port: {self.port}")
except Exception as e: # pragma: no cover
# I hope this code is unreachable.
logger.error(f"Port: {self.port} {e}")
raise
def close_connection(self):

View File

@ -285,9 +285,10 @@ class SpawnBase:
self.disconnect_from_child()
if self.safe_exit:
self.wait()
except Exception:
traceback.print_exc()
logger.warning(traceback.format_exc())
except Exception: # pragma: no cover
# I hope this code is unreachable...
logger.error(traceback.format_exc())
raise
def __str__(self):
"""This returns a human-readable string that represents the state of
@ -354,11 +355,13 @@ class SpawnBase:
# Deep copy needed to prevent cycle-to-cycle growth. See #31 for more details.
environ = os.environ.copy()
if getattr(sys, 'frozen', False):
if getattr(sys, 'frozen', False): # pragma: no cover
'''Runing in a PyInstaller bundle:
Pyinstaller has no explicit python interpreter, so console-reader should be bundled
also, and host should call it as a windows executable.
This code cannot be covered during tests, because it runs only in bundled way.
https://pyinstaller.readthedocs.io/en/stable/runtime-information.html#using-sys-executable-and-sys-argv-0
https://github.com/pyinstaller/pyinstaller/issues/822
'''
@ -428,7 +431,7 @@ class SpawnBase:
if not self.isalive(timeout = self.delayafterterminate):
return True
return False
raise ExceptionPexpect("Child has not been terminated even after it was killed.")
def isalive(self, trust_console=True, timeout=0):
"""True if the child is still alive, false otherwise"""
@ -522,7 +525,7 @@ class SpawnBase:
return self
def read_nonblocking(self, size=1):
def read_nonblocking(self, size=1): # pragma: no cover
"""Virtual definition
"""
raise NotImplementedError
@ -610,17 +613,17 @@ class SpawnBase:
return self._send_impl(s)
def _send_impl(self, s):
def _send_impl(self, s): # pragma: no cover
"""Virtual definition
"""
raise NotImplementedError
def connect_to_child(self):
def connect_to_child(self): # pragma: no cover
"""Virtual definition
"""
raise NotImplementedError
def disconnect_from_child(self):
def disconnect_from_child(self): # pragma: no cover
"""Virtual definition
"""
raise NotImplementedError

View File

@ -27,7 +27,20 @@ SPAM = 5
logging.addLevelName(SPAM, "SPAM")
def spam(self, message, *args, **kws):
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else: # pragma: no cover
raise argparse.ArgumentTypeError('Boolean value expected.')
def spam(self, message, *args, **kws): # pragma: no cover
'''Very verbose debug dunction.
'''
if self.isEnabledFor(SPAM):
# Yes, logger takes its '*args' as 'args'.
self._log(SPAM, message, args, **kws)
@ -36,7 +49,9 @@ def spam(self, message, *args, **kws):
logging.Logger.spam = spam
def init_logger(logger=None):
def init_logger(logger=None): # pragma: no cover
'''Initializes the logger. I wont measure coverage for this debug method.
'''
if logger is None:
logger = logging.getLogger('wexpect')
try: