Merge pull request #38 from raczben/test

Test
This commit is contained in:
Benedek Racz 2020-04-05 23:01:42 +02:00 committed by GitHub
commit 4e38fd3f4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 66 deletions

View File

@ -8,7 +8,6 @@ import os
import wexpect
from tests import PexpectTestCase
from .utils import no_coverage_env
# Many of these test cases blindly assume that sequential directory
# listings of the /bin directory will yield the same results.
@ -149,6 +148,10 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
wexpect.EOF])
self.assertEqual(index, 3, (index, p.before, p.after))
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def test_expect_index (self):
'''This tests that mixed list of regex strings, TIMEOUT, and EOF all
return the correct index when matched.
@ -162,6 +165,9 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p = wexpect.spawn('cat', timeout=5, echo=False)
p.expect = p.expect_exact
self._expect_index(p)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def _expect_index (self, p):
p.sendline ('1234')
@ -219,6 +225,10 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
the_old_way = the_old_way.replace('\r\n', '\n'
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
self.assertEqual(the_old_way, the_new_way)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
p = wexpect.spawn('echo hello.?world')
i = p.expect_exact('.?')
self.assertEqual(p.before, 'hello')
@ -237,6 +247,10 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
self.assertEqual(the_old_way, the_new_way)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def test_expect_timeout (self):
p = wexpect.spawn('cat', timeout=5)
p.expect(wexpect.TIMEOUT) # This tells it to wait for timeout.
@ -251,6 +265,9 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
else:
self.fail ('Expected an EOF exception.')
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def test_buffer_interface(self):
p = wexpect.spawn('cat', timeout=5)
p.sendline ('Hello')
@ -258,6 +275,8 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
assert len(p.buffer)
p.buffer = 'Testing'
p.sendeof ()
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def _before_after(self, p):
p.timeout = 5
@ -276,17 +295,20 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
assert p.before.startswith(', 51, 52'), p.before[:20]
assert p.before.endswith(', 99]\r\n'), p.before[-20:]
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def test_before_after(self):
'''This tests expect() for some simple before/after things.
'''
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE, env=no_coverage_env())
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE)
self._before_after(p)
def test_before_after_exact(self):
'''This tests some simple before/after things, for
expect_exact(). (Grahn broke it at one point.)
'''
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE, env=no_coverage_env())
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE)
# mangle the spawn so we test expect_exact() instead
p.expect = p.expect_exact
self._before_after(p)
@ -314,6 +336,8 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p.sendline('list(range(4*5))')
self.assertEqual(p.expect(['12,', '2,']), 1)
p.sendline('exit()')
def test_ordering(self):
'''This tests expect() for which pattern is returned
when many may eventually match. I (Grahn) am a bit
@ -334,6 +358,10 @@ class ExpectTestCase (PexpectTestCase.PexpectTestCase):
p.expect = p.expect_exact
self._ordering(p)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
def _greed(self, expect):
# End at the same point: the one with the earliest start should win
self.assertEqual(expect(['3, 4', '2, 3, 4']), 1)

View File

@ -24,6 +24,7 @@ class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase):
self._test(['a'], range(1,200), [1], [0])
self.p.terminate()
def test_random(self):
@ -45,6 +46,8 @@ class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase):
self._test(['a', 'b', 'c'], [16], [16], [-1, 0, 1])
self._test(['a', 'b', 'c'], [16, 32, 64], [16, 32, 64], [-1, 0])
self.p.terminate()
@unittest.skipIf(wexpect.spawn_class_name == 'legacy_wexpect', "legacy has bug around refreshing long consoles")
def test_long_console(self):
@ -64,6 +67,8 @@ class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase):
self._test(['a', 'b', 'c', 'd', 'e', 'f'], [8, 16, 32, 64], [64, 128, 256], [-1, 0])
self.p.terminate()
def _test(self, character_list, character_count_list, line_count_list, speed_ms_list):
# print(f'character_list: {character_list} character_count_list: {character_count_list} line_count_list: {line_count_list} speed_ms_list: {speed_ms_list}')

View File

@ -19,7 +19,7 @@ class ReadLineTestCase(PexpectTestCase.PexpectTestCase):
fooPath = python_executable + ' ' + child_script
prompt = ': '
num = 5
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
@ -28,27 +28,39 @@ class ReadLineTestCase(PexpectTestCase.PexpectTestCase):
p.expect('Bye!\r\n')
expected_lines = p.before.splitlines(True) # Keep the line end
expected_lines += [p.match.group()]
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
p.expect(prompt)
p.sendline(str(num))
for i in range(num +2): # +1 the line of sendline +1: Bye
line = p.readline()
self.assertEqual(expected_lines[i], line)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
p.expect(prompt)
p.sendline(str(num))
readlines_lines = p.readlines()
self.assertEqual(expected_lines, readlines_lines)
# Termination of the SpawnSocket is slow. We have to wait to prevent the failure of the next test.
if wexpect.spawn_class_name == 'SpawnSocket':
p.wait()
if __name__ == '__main__':
unittest.main()

42
tox.ini
View File

@ -3,18 +3,17 @@
[tox]
# The following configuration will run automatically.
envlist = py{37}-{default,legacy_wexpect,spawn_pipe},installed,pyinstaller
envlist = py{37}-{legacy_wexpect,spawn_pipe,spawn_socket},installed,pyinstaller
[testenv]
description = Unit tests
passenv =
WEXPECT_*
# Set environment variables to select the proper configuration for each envirnment.
setenv =
spawn_pipe: WEXPECT_SPAWN_CLASS=SpawnPipe
legacy_wexpect: WEXPECT_SPAWN_CLASS=legacy_wexpect
spawn_socket: WEXPECT_SPAWN_CLASS=SpawnSocket
commands =
# install the dependencies:
@ -33,48 +32,47 @@ commands =
# https://tox.readthedocs.io/en/latest/config.html#environment-variable-substitutions
coverage xml --omit=tests/*,site-packages -o {env:TOX_ENV_NAME}_coverage.xml
[testenv:installed]
# normal tests test the cloned files. This testenv tests the installation itself.
description = Unit tests installed
# normal tests test the cloned files. This testenv tests the installation itself, with the default
# spawn class.
description = Unit tests installed, and default spawn class.
changedir = test_01_installed
whitelist_externals =
whitelist_externals =
cp
# Appveyor will set the WEXPECT_SPAWN_CLASS to run the proper configuration for each run.
passenv =
WEXPECT_*
commands =
# copy all testcase into working dir
cp -r ../tests tests
# Run the test itself
python -m unittest
# Run the test itself. Running all tests is not needed, because it just test the installation,
# not functions.
python -m unittest tests.test_misc
[testenv:pyinstaller]
# Test if wexpect working with pyinstaller. See #12 for more details.
description = Unit tests pyinstaller
whitelist_externals =
whitelist_externals =
pyinstaller
pyinstaller_test
setenv =
WEXPECT_SPAWN_CLASS=SpawnPipe
commands =
# install the dependencies:
pip install .[test]
# create wexpect executable for console-reader.
pyinstaller wexpect.spec
# create test executable, to thest the console-reader
pyinstaller tests/pyinstaller_test.py
# run test:
dist\pyinstaller_test\pyinstaller_test.exe

View File

@ -63,6 +63,7 @@ def main():
buffer_size_x=args.buffer_size_x, buffer_size_y=args.buffer_size_y,
local_echo=str2bool(args.local_echo), interact=str2bool(args.interact))
logger.info(f'Exiting with status: {cons.child_exitstatus}')
sys.exit(cons.child_exitstatus)
except Exception as e: # pragma: no cover

View File

@ -469,7 +469,9 @@ class ConsoleReaderSocket(ConsoleReaderBase):
def close_connection(self):
if self.connection:
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
self.connection = None
def send_to_host(self, msg):
# convert to bytes

View File

@ -411,7 +411,7 @@ class SpawnBase:
if force or self.console_process is None:
self.child_process = self.get_console_process()
self.child_pid = self.child_process.pid
return self.child_process
return self.child_process
def close(self): # File-like object.
""" Closes the child console."""
@ -425,13 +425,12 @@ class SpawnBase:
return True
self.kill()
time.sleep(self.delayafterterminate)
if not self.isalive():
if not self.isalive(timeout = self.delayafterterminate):
return True
return False
def isalive(self, trust_console=True):
def isalive(self, trust_console=True, timeout=0):
"""True if the child is still alive, false otherwise"""
if trust_console:
if self.flag_eof:
@ -441,20 +440,30 @@ class SpawnBase:
# Child process has not been started... Not alive
return False
if self.exitstatus is not None:
return False
try:
self.exitstatus = self.child_process.wait(timeout=0)
self.exitstatus = self.child_process.wait(timeout=timeout)
logger.info(f'exitstatus: {self.exitstatus}')
return False
except psutil.TimeoutExpired:
return True
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
self.child_process.send_signal(sig)
except psutil.NoSuchProcess as e:
logger.info('Child has already died. %s', e)
logger.info(f'Sending kill signal: {sig}')
self.send(SIGNAL_CHARS[sig])
self.terminated = True
except EOF as e:
logger.info(e)
def wait(self, child=True, console=False):
if self.exitstatus is not None:
return self.exitstatus
if child:
self.exitstatus = self.child_process.wait()
logger.info(f'exitstatus: {self.exitstatus}')
@ -589,6 +598,10 @@ class SpawnBase:
def send(self, s, delaybeforesend=None):
"""Virtual definition
"""
if self.flag_eof:
logger.info('EOF: End of file has been already detected.')
raise EOF('End of file has been already detected.')
if delaybeforesend is None:
delaybeforesend = self.delaybeforesend
@ -814,6 +827,8 @@ class SpawnBase:
self.match_index = index
return self.match_index
# No match at this point
if self.flag_eof:
raise EOF('EOF flag has been raised.')
if timeout is not None and end_time < time.time():
logger.info('Timeout exceeded in expect_any().')
raise TIMEOUT('Timeout exceeded in expect_any().')
@ -835,8 +850,8 @@ class SpawnBase:
else:
self.match = None
self.match_index = None
logger.info(f'EOF: {e}\n{self}')
raise EOF(f'{e}\n{self}')
logger.info('Raise EOF again')
raise
except TIMEOUT as e:
self.buffer = incoming
self.before = incoming
@ -875,7 +890,7 @@ class SpawnPipe(SpawnBase):
# Sets delay in terminate() method to allow kernel time to update process status. Time in
# seconds.
self.delayafterterminate = 1
self.delayafterterminate = 2
def connect_to_child(self):
pipe_name = 'wexpect_{}'.format(self.console_pid)
@ -932,10 +947,10 @@ class SpawnPipe(SpawnBase):
else:
logger.spam(f'Readed: {s}')
if b'\x04' in s:
if EOF_CHAR in s:
self.flag_eof = True
logger.info("EOF: EOF character has been arrived")
raise EOF('EOF character has been arrived')
s = s.split(EOF_CHAR)[0]
return s.decode()
except pywintypes.error as e:
@ -966,26 +981,19 @@ class SpawnPipe(SpawnBase):
except pywintypes.error as e:
if e.args[0] == winerror.ERROR_BROKEN_PIPE: # 109
logger.info("EOF: broken pipe, bye bye")
self.flag_eof = True
raise EOF("broken pipe, bye bye")
elif e.args[0] == winerror.ERROR_NO_DATA:
'''232 (0xE8)
The pipe is being closed.
'''
logger.info("The pipe is being closed.")
self.flag_eof = True
raise EOF("The pipe is being closed.")
else:
raise
return len(s)
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
logger.info(f'Sending kill signal: {sig}')
self.send(SIGNAL_CHARS[sig])
self.terminated = True
except EOF as e:
logger.info(e)
class SpawnSocket(SpawnBase):
@ -1005,7 +1013,7 @@ class SpawnSocket(SpawnBase):
# Sets delay in terminate() method to allow kernel time to update process status. Time in
# seconds.
self.delayafterterminate = 1
self.delayafterterminate = 2
def _send_impl(self, s):
"""This sends a string to the child process. This returns the number of
@ -1013,8 +1021,16 @@ class SpawnSocket(SpawnBase):
the log. """
if isinstance(s, str):
s = str.encode(s)
self.sock.sendall(s)
return len(s)
try:
if s:
logger.debug(f"Writing: {s}")
self.sock.sendall(s)
logger.spam(f"WriteFile finished.")
return len(s)
except ConnectionResetError as e:
logger.info("ConnectionResetError")
self.flag_eof = True
raise EOF("ConnectionResetError")
def connect_to_child(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -1022,7 +1038,9 @@ class SpawnSocket(SpawnBase):
self.sock.settimeout(.2)
def disconnect_from_child(self):
logger.info('disconnect_from_child')
if self.sock:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
self.sock = None
@ -1052,7 +1070,7 @@ class SpawnSocket(SpawnBase):
if EOF_CHAR in s:
self.flag_eof = True
logger.info("EOF: EOF character has been arrived")
raise EOF('EOF character has been arrived')
s = s.split(EOF_CHAR)[0]
except ConnectionResetError:
self.flag_eof = True
@ -1063,14 +1081,6 @@ class SpawnSocket(SpawnBase):
return s.decode()
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
logger.info(f'Sending kill signal: {sig}')
self.send(SIGNAL_CHARS[sig])
except EOF as e:
logger.info(e)
class searcher_re (object):
"""This is regular expression string search helper for the