2019-12-05 15:42:49 +00:00
|
|
|
import unittest
|
|
|
|
import sys
|
|
|
|
import re
|
|
|
|
import os
|
2020-01-31 14:49:48 +00:00
|
|
|
import time
|
2019-12-05 15:42:49 +00:00
|
|
|
|
|
|
|
import wexpect
|
2020-02-22 22:03:28 +00:00
|
|
|
from tests import PexpectTestCase
|
2019-12-05 15:42:49 +00:00
|
|
|
|
|
|
|
# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent
|
|
|
|
_CAT_EOF = '^D\x08\x08'
|
|
|
|
|
|
|
|
def _u(s):
|
|
|
|
return s
|
|
|
|
|
|
|
|
PYBIN = '"{}"'.format(sys.executable)
|
|
|
|
|
|
|
|
class TestCaseMisc(PexpectTestCase.PexpectTestCase):
|
|
|
|
|
2020-04-06 11:07:49 +00:00
|
|
|
def test_wrong_command(self):
|
|
|
|
"Test raise of exception in case of wrong program"
|
|
|
|
with self.assertRaisesRegex(wexpect.ExceptionPexpect,"The command was not found.+"):
|
|
|
|
child = wexpect.spawn('unexecutable')
|
|
|
|
|
2019-12-05 15:42:49 +00:00
|
|
|
def test_isatty(self):
|
|
|
|
" Test isatty() is True after spawning process on most platforms. "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
assert child.isatty()
|
|
|
|
|
|
|
|
def test_read(self):
|
|
|
|
" Test spawn.read by calls of various size. "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
child.sendline("abc")
|
|
|
|
child.sendeof()
|
|
|
|
child.readline()
|
|
|
|
self.assertEqual(child.read(0), '')
|
|
|
|
self.assertEqual(child.read(1), 'a')
|
|
|
|
self.assertEqual(child.read(1), 'b')
|
|
|
|
self.assertEqual(child.read(1), 'c')
|
|
|
|
self.assertEqual(child.read(2), '\r\n')
|
|
|
|
|
|
|
|
def test_readline_bin_echo(self):
|
|
|
|
" Test spawn('echo'). "
|
|
|
|
# given,
|
|
|
|
child = wexpect.spawn('echo', ['alpha', 'beta'])
|
|
|
|
|
|
|
|
# exercise,
|
|
|
|
self.assertEqual(child.readline(), 'alpha beta\r\n')
|
|
|
|
|
|
|
|
def test_readline(self):
|
|
|
|
" Test spawn.readline(). "
|
|
|
|
# when argument 0 is sent, nothing is returned.
|
|
|
|
# Otherwise the argument value is meaningless.
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
child.sendline("alpha")
|
|
|
|
child.sendline("beta")
|
|
|
|
child.sendline("gamma")
|
|
|
|
child.sendline("delta")
|
|
|
|
child.sendeof()
|
|
|
|
self.assertEqual(child.readline(0), '')
|
|
|
|
child.readline().rstrip()
|
|
|
|
self.assertEqual(child.readline().rstrip(), 'alpha')
|
|
|
|
child.readline().rstrip()
|
|
|
|
self.assertEqual(child.readline(1).rstrip(), 'beta')
|
|
|
|
child.readline().rstrip()
|
|
|
|
self.assertEqual(child.readline(2).rstrip(), 'gamma')
|
|
|
|
child.readline().rstrip()
|
|
|
|
self.assertEqual(child.readline().rstrip(), 'delta')
|
|
|
|
child.expect(wexpect.EOF)
|
2020-01-31 14:49:48 +00:00
|
|
|
if type(child).__name__ in ['SpawnPipe', 'SpawnSocket']:
|
|
|
|
time.sleep(child.delayafterterminate)
|
|
|
|
assert not child.isalive(trust_console=False)
|
|
|
|
else:
|
|
|
|
assert not child.isalive()
|
2019-12-05 15:42:49 +00:00
|
|
|
self.assertEqual(child.exitstatus, 0)
|
|
|
|
|
|
|
|
def test_iter(self):
|
|
|
|
" iterating over lines of spawn.__iter__(). "
|
|
|
|
child = wexpect.spawn('echo "abc\r\n123"')
|
|
|
|
# Don't use ''.join() because we want to test __iter__().
|
|
|
|
page = ''
|
|
|
|
for line in child:
|
|
|
|
page += line
|
|
|
|
page = page.replace(_CAT_EOF, '')
|
|
|
|
self.assertEqual(page, 'abc\r\n123\r\n')
|
|
|
|
|
|
|
|
def test_readlines(self):
|
|
|
|
" reading all lines of spawn.readlines(). "
|
2020-01-09 14:02:16 +00:00
|
|
|
child = wexpect.spawn('cat', echo=False)
|
2019-12-05 15:42:49 +00:00
|
|
|
child.sendline("abc")
|
|
|
|
child.sendline("123")
|
|
|
|
child.sendeof()
|
|
|
|
page = ''.join(child.readlines()).replace(_CAT_EOF, '')
|
|
|
|
self.assertEqual(page, '\r\nabc\r\n\r\n123\r\n')
|
|
|
|
child.expect(wexpect.EOF)
|
2020-01-31 14:49:48 +00:00
|
|
|
if type(child).__name__ in ['SpawnPipe', 'SpawnSocket']:
|
|
|
|
time.sleep(child.delayafterterminate)
|
|
|
|
assert not child.isalive(trust_console=False)
|
|
|
|
else:
|
|
|
|
assert not child.isalive()
|
2019-12-05 15:42:49 +00:00
|
|
|
self.assertEqual(child.exitstatus, 0)
|
|
|
|
|
|
|
|
def test_write(self):
|
|
|
|
" write a character and return it in return. "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
child.write('a')
|
|
|
|
child.write('\r')
|
|
|
|
child.readline()
|
|
|
|
self.assertEqual(child.readline(), 'a\r\n')
|
|
|
|
|
|
|
|
def test_writelines(self):
|
|
|
|
" spawn.writelines() "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
# notice that much like file.writelines, we do not delimit by newline
|
|
|
|
# -- it is equivalent to calling write(''.join([args,]))
|
|
|
|
child.writelines(['abc', '123', 'xyz', '\r'])
|
|
|
|
child.sendeof()
|
|
|
|
child.readline()
|
|
|
|
line = child.readline()
|
|
|
|
self.assertEqual(line, 'abc123xyz\r\n')
|
|
|
|
|
|
|
|
def test_eof(self):
|
|
|
|
" call to expect() after EOF is received raises wexpect.EOF "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
child.sendeof()
|
|
|
|
with self.assertRaises(wexpect.EOF):
|
|
|
|
child.expect('the unexpected')
|
|
|
|
|
|
|
|
def test_with(self):
|
|
|
|
"spawn can be used as a context manager"
|
|
|
|
with wexpect.spawn(PYBIN + ' echo_w_prompt.py') as p:
|
|
|
|
p.expect('<in >')
|
|
|
|
p.sendline('alpha')
|
|
|
|
p.expect('<out>alpha')
|
|
|
|
assert p.isalive()
|
|
|
|
|
|
|
|
assert not p.isalive()
|
|
|
|
|
|
|
|
def test_terminate(self):
|
|
|
|
" test force terminate always succeeds (SIGKILL). "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
child.terminate(force=1)
|
|
|
|
assert child.terminated
|
|
|
|
|
|
|
|
def test_bad_arguments_suggest_fdpsawn(self):
|
|
|
|
" assert custom exception for spawn(int). "
|
|
|
|
expect_errmsg = "maybe you want to use fdpexpect.fdspawn"
|
|
|
|
with self.assertRaisesRegex(wexpect.ExceptionPexpect,
|
|
|
|
".*" + expect_errmsg):
|
|
|
|
wexpect.spawn(1)
|
|
|
|
|
|
|
|
def test_bad_arguments_second_arg_is_list(self):
|
|
|
|
" Second argument to spawn, if used, must be only a list."
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
wexpect.spawn('ls', '-la')
|
|
|
|
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
# not even a tuple,
|
|
|
|
wexpect.spawn('ls', ('-la',))
|
|
|
|
|
|
|
|
def test_read_after_close_raises_value_error(self):
|
|
|
|
" Calling read_nonblocking after close raises ValueError. "
|
|
|
|
# as read_nonblocking underlies all other calls to read,
|
|
|
|
# ValueError should be thrown for all forms of read.
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
p = wexpect.spawn('cat')
|
|
|
|
p.close()
|
|
|
|
p.read_nonblocking()
|
|
|
|
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
p = wexpect.spawn('cat')
|
|
|
|
p.close()
|
|
|
|
p.read()
|
|
|
|
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
p = wexpect.spawn('cat')
|
|
|
|
p.close()
|
|
|
|
p.readline()
|
|
|
|
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
p = wexpect.spawn('cat')
|
|
|
|
p.close()
|
|
|
|
p.readlines()
|
|
|
|
|
|
|
|
def test_isalive(self):
|
|
|
|
" check isalive() before and after EOF. (True, False) "
|
|
|
|
child = wexpect.spawn('cat')
|
2020-01-25 15:59:19 +00:00
|
|
|
self.assertTrue(child.isalive())
|
2019-12-05 15:42:49 +00:00
|
|
|
child.sendeof()
|
|
|
|
child.expect(wexpect.EOF)
|
|
|
|
assert child.isalive() is False
|
2020-01-25 15:59:19 +00:00
|
|
|
self.assertFalse(child.isalive())
|
2019-12-05 15:42:49 +00:00
|
|
|
|
|
|
|
def test_bad_type_in_expect(self):
|
|
|
|
" expect() does not accept dictionary arguments. "
|
|
|
|
child = wexpect.spawn('cat')
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
child.expect({})
|
|
|
|
|
|
|
|
def test_cwd(self):
|
|
|
|
" check keyword argument `cwd=' of wexpect.run() "
|
|
|
|
try:
|
|
|
|
os.mkdir('cwd_tmp')
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
tmp_dir = os.path.realpath('cwd_tmp')
|
|
|
|
child = wexpect.spawn('cmd')
|
|
|
|
child.expect('>')
|
|
|
|
child.sendline('cd')
|
|
|
|
child.expect('>')
|
|
|
|
default = child.before.splitlines()[1]
|
|
|
|
child.terminate()
|
|
|
|
child = wexpect.spawn('cmd', cwd=tmp_dir)
|
|
|
|
child.expect('>')
|
|
|
|
child.sendline('cd')
|
|
|
|
child.expect('>')
|
|
|
|
pwd_tmp = child.before.splitlines()[1]
|
|
|
|
child.terminate()
|
|
|
|
self.assertNotEqual(default, pwd_tmp)
|
|
|
|
self.assertEqual(tmp_dir, _u(pwd_tmp))
|
|
|
|
|
|
|
|
def _test_searcher_as(self, searcher, plus=None):
|
|
|
|
# given,
|
|
|
|
given_words = ['alpha', 'beta', 'gamma', 'delta', ]
|
|
|
|
given_search = given_words
|
|
|
|
if searcher == wexpect.searcher_re:
|
|
|
|
given_search = [re.compile(word) for word in given_words]
|
|
|
|
if plus is not None:
|
|
|
|
given_search = given_search + [plus]
|
|
|
|
search_string = searcher(given_search)
|
|
|
|
basic_fmt = '\n {0}: {1}'
|
|
|
|
fmt = basic_fmt
|
|
|
|
if searcher is wexpect.searcher_re:
|
|
|
|
fmt = '\n {0}: re.compile({1})'
|
|
|
|
expected_output = '{0}:'.format(searcher.__name__)
|
|
|
|
idx = 0
|
|
|
|
for word in given_words:
|
|
|
|
expected_output += fmt.format(idx, '"{0}"'.format(word))
|
|
|
|
idx += 1
|
|
|
|
if plus is not None:
|
|
|
|
if plus == wexpect.EOF:
|
|
|
|
expected_output += basic_fmt.format(idx, 'EOF')
|
|
|
|
elif plus == wexpect.TIMEOUT:
|
|
|
|
expected_output += basic_fmt.format(idx, 'TIMEOUT')
|
|
|
|
|
|
|
|
# exercise,
|
|
|
|
self.assertEqual(search_string.__str__(), expected_output)
|
|
|
|
|
|
|
|
def test_searcher_as_string(self):
|
|
|
|
" check searcher_string(..).__str__() "
|
|
|
|
self._test_searcher_as(wexpect.searcher_string)
|
|
|
|
|
|
|
|
def test_searcher_as_string_with_EOF(self):
|
|
|
|
" check searcher_string(..).__str__() that includes EOF "
|
|
|
|
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.EOF)
|
|
|
|
|
|
|
|
def test_searcher_as_string_with_TIMEOUT(self):
|
|
|
|
" check searcher_string(..).__str__() that includes TIMEOUT "
|
|
|
|
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.TIMEOUT)
|
|
|
|
|
|
|
|
def test_searcher_re_as_string(self):
|
|
|
|
" check searcher_re(..).__str__() "
|
|
|
|
self._test_searcher_as(wexpect.searcher_re)
|
|
|
|
|
|
|
|
def test_searcher_re_as_string_with_EOF(self):
|
|
|
|
" check searcher_re(..).__str__() that includes EOF "
|
|
|
|
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.EOF)
|
|
|
|
|
|
|
|
def test_searcher_re_as_string_with_TIMEOUT(self):
|
|
|
|
" check searcher_re(..).__str__() that includes TIMEOUT "
|
|
|
|
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.TIMEOUT)
|
|
|
|
|
|
|
|
def test_exception_tb(self):
|
|
|
|
" test get_trace() filters away wexpect/__init__.py calls. "
|
|
|
|
p = wexpect.spawn('sleep 1')
|
|
|
|
try:
|
|
|
|
p.expect('BLAH')
|
|
|
|
except wexpect.ExceptionPexpect as e:
|
|
|
|
# get_trace should filter out frames in wexpect's own code
|
|
|
|
tb = e.get_trace()
|
|
|
|
# exercise,
|
|
|
|
assert 'raise ' not in tb
|
|
|
|
assert 'wexpect/__init__.py' not in tb
|
|
|
|
else:
|
|
|
|
assert False, "Should have raised an exception."
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|
|
|
|
|
|
|
|
suite = unittest.makeSuite(TestCaseMisc,'test')
|