mirror of
https://github.com/clearml/wexpect-venv
synced 2025-01-31 02:46:59 +00:00
[ADD] console reader coverage
This commit is contained in:
parent
ec3f7ea2e8
commit
c5ded637ce
333
tests/test_misc_console_coverage.py
Normal file
333
tests/test_misc_console_coverage.py
Normal file
@ -0,0 +1,333 @@
|
||||
#!/usr/bin/env python
|
||||
'''
|
||||
wexpect LICENSE
|
||||
|
||||
This license is approved by the OSI and FSF as GPL-compatible.
|
||||
http://opensource.org/licenses/isc-license.txt
|
||||
|
||||
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
|
||||
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
|
||||
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
|
||||
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
'''
|
||||
import unittest
|
||||
import sys
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
|
||||
import wexpect
|
||||
from tests import PexpectTestCase
|
||||
|
||||
# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent
|
||||
_CAT_EOF = '^D\x08\x08'
|
||||
|
||||
def _u(s):
|
||||
return s
|
||||
|
||||
PYBIN = '"{}"'.format(sys.executable)
|
||||
|
||||
class TestCaseMisc(PexpectTestCase.PexpectTestCase):
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_isatty(self):
|
||||
" Test isatty() is True after spawning process on most platforms. "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
if not child.isatty() and sys.platform.lower().startswith('sunos'):
|
||||
if hasattr(unittest, 'SkipTest'):
|
||||
raise unittest.SkipTest("Not supported on this platform.")
|
||||
return 'skip'
|
||||
assert child.isatty()
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_read(self):
|
||||
" Test spawn.read by calls of various size. "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
child.sendline("abc")
|
||||
child.sendeof()
|
||||
child.readline()
|
||||
self.assertEqual(child.read(0), '')
|
||||
self.assertEqual(child.read(1), 'a')
|
||||
self.assertEqual(child.read(1), 'b')
|
||||
self.assertEqual(child.read(1), 'c')
|
||||
self.assertEqual(child.read(2), '\r\n')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_readline_bin_echo(self):
|
||||
" Test spawn('echo'). "
|
||||
# given,
|
||||
child = wexpect.spawn('echo', ['alpha', 'beta'], coverage_console_reader=True)
|
||||
|
||||
# exercise,
|
||||
self.assertEqual(child.readline(), 'alpha beta\r\n')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_readline(self):
|
||||
" Test spawn.readline(). "
|
||||
# when argument 0 is sent, nothing is returned.
|
||||
# Otherwise the argument value is meaningless.
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
child.sendline("alpha")
|
||||
child.sendline("beta")
|
||||
child.sendline("gamma")
|
||||
child.sendline("delta")
|
||||
child.sendeof()
|
||||
self.assertEqual(child.readline(0), '')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline().rstrip(), 'alpha')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline(1).rstrip(), 'beta')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline(2).rstrip(), 'gamma')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline().rstrip(), 'delta')
|
||||
child.expect(wexpect.EOF)
|
||||
if type(child).__name__ in ['SpawnPipe', 'SpawnSocket']:
|
||||
time.sleep(child.delayafterterminate)
|
||||
assert not child.isalive(trust_console=False)
|
||||
else:
|
||||
assert not child.isalive()
|
||||
self.assertEqual(child.exitstatus, 0)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_iter(self):
|
||||
" iterating over lines of spawn.__iter__(). "
|
||||
child = wexpect.spawn('echo "abc\r\n123"', coverage_console_reader=True)
|
||||
# Don't use ''.join() because we want to test __iter__().
|
||||
page = ''
|
||||
for line in child:
|
||||
page += line
|
||||
page = page.replace(_CAT_EOF, '')
|
||||
self.assertEqual(page, 'abc\r\n123\r\n')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_readlines(self):
|
||||
" reading all lines of spawn.readlines(). "
|
||||
child = wexpect.spawn('cat', echo=False, coverage_console_reader=True)
|
||||
child.sendline("abc")
|
||||
child.sendline("123")
|
||||
child.sendeof()
|
||||
page = ''.join(child.readlines()).replace(_CAT_EOF, '')
|
||||
self.assertEqual(page, '\r\nabc\r\n\r\n123\r\n')
|
||||
child.expect(wexpect.EOF)
|
||||
if type(child).__name__ in ['SpawnPipe', 'SpawnSocket']:
|
||||
time.sleep(child.delayafterterminate)
|
||||
assert not child.isalive(trust_console=False)
|
||||
else:
|
||||
assert not child.isalive()
|
||||
self.assertEqual(child.exitstatus, 0)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_write(self):
|
||||
" write a character and return it in return. "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
child.write('a')
|
||||
child.write('\r')
|
||||
child.readline()
|
||||
self.assertEqual(child.readline(), 'a\r\n')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_writelines(self):
|
||||
" spawn.writelines() "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
# notice that much like file.writelines, we do not delimit by newline
|
||||
# -- it is equivalent to calling write(''.join([args,]))
|
||||
child.writelines(['abc', '123', 'xyz', '\r'])
|
||||
child.sendeof()
|
||||
child.readline()
|
||||
line = child.readline()
|
||||
self.assertEqual(line, 'abc123xyz\r\n')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_eof(self):
|
||||
" call to expect() after EOF is received raises wexpect.EOF "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
child.sendeof()
|
||||
with self.assertRaises(wexpect.EOF):
|
||||
child.expect('the unexpected')
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_with(self):
|
||||
"spawn can be used as a context manager"
|
||||
with wexpect.spawn(PYBIN + ' echo_w_prompt.py', coverage_console_reader=True) as p:
|
||||
p.expect('<in >')
|
||||
p.sendline('alpha')
|
||||
p.expect('<out>alpha')
|
||||
assert p.isalive()
|
||||
|
||||
assert not p.isalive()
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_terminate(self):
|
||||
" test force terminate always succeeds (SIGKILL). "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
child.terminate(force=1)
|
||||
assert child.terminated
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_bad_arguments_suggest_fdpsawn(self):
|
||||
" assert custom exception for spawn(int). "
|
||||
expect_errmsg = "maybe you want to use fdpexpect.fdspawn"
|
||||
with self.assertRaisesRegex(wexpect.ExceptionPexpect,
|
||||
".*" + expect_errmsg):
|
||||
wexpect.spawn(1, coverage_console_reader=True)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_bad_arguments_second_arg_is_list(self):
|
||||
" Second argument to spawn, if used, must be only a list."
|
||||
with self.assertRaises(TypeError):
|
||||
wexpect.spawn('ls', '-la', coverage_console_reader=True)
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
# not even a tuple,
|
||||
wexpect.spawn('ls', ('-la',), coverage_console_reader=True)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_read_after_close_raises_value_error(self):
|
||||
" Calling read_nonblocking after close raises ValueError. "
|
||||
# as read_nonblocking underlies all other calls to read,
|
||||
# ValueError should be thrown for all forms of read.
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
p.close()
|
||||
p.read_nonblocking()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
p.close()
|
||||
p.read()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
p.close()
|
||||
p.readline()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
p.close()
|
||||
p.readlines()
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_isalive(self):
|
||||
" check isalive() before and after EOF. (True, False) "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
self.assertTrue(child.isalive())
|
||||
child.sendeof()
|
||||
child.expect(wexpect.EOF)
|
||||
assert child.isalive() is False
|
||||
self.assertFalse(child.isalive())
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_bad_type_in_expect(self):
|
||||
" expect() does not accept dictionary arguments. "
|
||||
child = wexpect.spawn('cat', coverage_console_reader=True)
|
||||
with self.assertRaises(TypeError):
|
||||
child.expect({})
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_cwd(self):
|
||||
" check keyword argument `cwd=' of wexpect.run() "
|
||||
try:
|
||||
os.mkdir('cwd_tmp')
|
||||
except:
|
||||
pass
|
||||
tmp_dir = os.path.realpath('cwd_tmp')
|
||||
child = wexpect.spawn('cmd', coverage_console_reader=True)
|
||||
child.expect('>')
|
||||
child.sendline('cd')
|
||||
child.expect('>')
|
||||
default = child.before.splitlines()[1]
|
||||
child.terminate()
|
||||
child = wexpect.spawn('cmd', cwd=tmp_dir, coverage_console_reader=True)
|
||||
child.expect('>')
|
||||
child.sendline('cd')
|
||||
child.expect('>')
|
||||
pwd_tmp = child.before.splitlines()[1]
|
||||
child.terminate()
|
||||
self.assertNotEqual(default, pwd_tmp)
|
||||
self.assertEqual(tmp_dir, _u(pwd_tmp))
|
||||
|
||||
def _test_searcher_as(self, searcher, plus=None):
|
||||
# given,
|
||||
given_words = ['alpha', 'beta', 'gamma', 'delta', ]
|
||||
given_search = given_words
|
||||
if searcher == wexpect.searcher_re:
|
||||
given_search = [re.compile(word) for word in given_words]
|
||||
if plus is not None:
|
||||
given_search = given_search + [plus]
|
||||
search_string = searcher(given_search)
|
||||
basic_fmt = '\n {0}: {1}'
|
||||
fmt = basic_fmt
|
||||
if searcher is wexpect.searcher_re:
|
||||
fmt = '\n {0}: re.compile({1})'
|
||||
expected_output = '{0}:'.format(searcher.__name__)
|
||||
idx = 0
|
||||
for word in given_words:
|
||||
expected_output += fmt.format(idx, '"{0}"'.format(word))
|
||||
idx += 1
|
||||
if plus is not None:
|
||||
if plus == wexpect.EOF:
|
||||
expected_output += basic_fmt.format(idx, 'EOF')
|
||||
elif plus == wexpect.TIMEOUT:
|
||||
expected_output += basic_fmt.format(idx, 'TIMEOUT')
|
||||
|
||||
# exercise,
|
||||
self.assertEqual(search_string.__str__(), expected_output)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_as_string(self):
|
||||
" check searcher_string(..).__str__() "
|
||||
self._test_searcher_as(wexpect.searcher_string)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_as_string_with_EOF(self):
|
||||
" check searcher_string(..).__str__() that includes EOF "
|
||||
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.EOF)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_as_string_with_TIMEOUT(self):
|
||||
" check searcher_string(..).__str__() that includes TIMEOUT "
|
||||
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.TIMEOUT)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_re_as_string(self):
|
||||
" check searcher_re(..).__str__() "
|
||||
self._test_searcher_as(wexpect.searcher_re)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_re_as_string_with_EOF(self):
|
||||
" check searcher_re(..).__str__() that includes EOF "
|
||||
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.EOF)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_searcher_re_as_string_with_TIMEOUT(self):
|
||||
" check searcher_re(..).__str__() that includes TIMEOUT "
|
||||
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.TIMEOUT)
|
||||
|
||||
@unittest.skipIf(hasattr(wexpect, 'legacy_wexpect'), "legacy unsupported")
|
||||
def test_exception_tb(self):
|
||||
" test get_trace() filters away wexpect/__init__.py calls. "
|
||||
p = wexpect.spawn('sleep 1', coverage_console_reader=True)
|
||||
try:
|
||||
p.expect('BLAH')
|
||||
except wexpect.ExceptionPexpect as e:
|
||||
# get_trace should filter out frames in wexpect's own code
|
||||
tb = e.get_trace()
|
||||
# exercise,
|
||||
assert 'raise ' not in tb
|
||||
assert 'wexpect/__init__.py' not in tb
|
||||
else:
|
||||
assert False, "Should have raised an exception."
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
suite = unittest.makeSuite(TestCaseMisc,'test')
|
7
tox.ini
7
tox.ini
@ -21,7 +21,10 @@ commands =
|
||||
pip install .[test]
|
||||
|
||||
# Run the test itself
|
||||
coverage run -m unittest
|
||||
coverage run --parallel-mode -m unittest
|
||||
|
||||
# Combine the parallel results
|
||||
coverage combine
|
||||
|
||||
# Dump coverage summary to console
|
||||
coverage report --omit=tests/*,site-packages/*
|
||||
@ -74,4 +77,4 @@ commands =
|
||||
|
||||
# run test:
|
||||
dist\pyinstaller_test\pyinstaller_test.exe
|
||||
|
||||
|
||||
|
@ -210,7 +210,7 @@ def run(command, timeout=-1, withexitstatus=False, events=None, extra_args=None,
|
||||
class SpawnBase:
|
||||
def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None,
|
||||
logfile=None, cwd=None, env=None, codepage=None, echo=True, safe_exit=True,
|
||||
interact=False, **kwargs):
|
||||
interact=False, coverage_console_reader=False, **kwargs):
|
||||
"""This starts the given command in a child process. This does all the
|
||||
fork/exec type of stuff for a pty. This is called by __init__. If args
|
||||
is empty then command will be parsed (split on spaces) and args will be
|
||||
@ -250,6 +250,7 @@ class SpawnBase:
|
||||
self.cwd = cwd
|
||||
self.env = env
|
||||
self.echo = echo
|
||||
self.coverage_console_reader = coverage_console_reader
|
||||
self.maxread = maxread # max bytes to read at one time into buffer
|
||||
# delaybeforesend: Sets sleep time used just before sending data to child. Time in seconds.
|
||||
self.delaybeforesend = 0.1
|
||||
@ -410,6 +411,9 @@ class SpawnBase:
|
||||
pyargs = ['-m']
|
||||
python_executable = sys.executable
|
||||
|
||||
if self.coverage_console_reader:
|
||||
pyargs = ['-m', 'coverage', 'run', '--parallel-mode', '-m']
|
||||
|
||||
# add current location to PYTHONPATH environment variable to be able to start the child.
|
||||
python_path = environ.get('PYTHONPATH', '')
|
||||
spath = ';'.join(spath)
|
||||
@ -897,7 +901,7 @@ class SpawnPipe(SpawnBase):
|
||||
super().__init__(
|
||||
command=command, args=args, timeout=timeout, maxread=maxread,
|
||||
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo,
|
||||
interact=interact)
|
||||
interact=interact, **kwargs)
|
||||
|
||||
# Sets delay in terminate() method to allow kernel time to update process status. Time in
|
||||
# seconds.
|
||||
@ -1017,7 +1021,7 @@ class SpawnSocket(SpawnBase):
|
||||
|
||||
def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None,
|
||||
logfile=None, cwd=None, env=None, codepage=None, echo=True, port=4321,
|
||||
host='127.0.0.1', interact=False):
|
||||
host='127.0.0.1', interact=False, **kwargs):
|
||||
self.port = port
|
||||
self.host = host
|
||||
self.sock = None
|
||||
@ -1027,7 +1031,7 @@ class SpawnSocket(SpawnBase):
|
||||
super().__init__(
|
||||
command=command, args=args, timeout=timeout, maxread=maxread,
|
||||
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo,
|
||||
interact=interact)
|
||||
interact=interact, **kwargs)
|
||||
|
||||
# Sets delay in terminate() method to allow kernel time to update process status. Time in
|
||||
# seconds.
|
||||
|
Loading…
Reference in New Issue
Block a user