mirror of
https://github.com/clearml/wexpect-venv
synced 2025-01-30 18:36:57 +00:00
[ADD] __enter__ and __exit__ methods; [FIX] iteration; [ADD][TST] test_misc
This commit is contained in:
parent
888209d2ad
commit
2e6ac6bc1f
15
tests/echo_w_prompt.py
Normal file
15
tests/echo_w_prompt.py
Normal file
@ -0,0 +1,15 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import print_function
|
||||
|
||||
try:
|
||||
raw_input
|
||||
except NameError:
|
||||
raw_input = input
|
||||
|
||||
while True:
|
||||
try:
|
||||
a = raw_input('<in >')
|
||||
except EOFError:
|
||||
print('<eof>')
|
||||
break
|
||||
print('<out>', a, sep='')
|
302
tests/test_misc.py
Normal file
302
tests/test_misc.py
Normal file
@ -0,0 +1,302 @@
|
||||
#!/usr/bin/env python
|
||||
'''
|
||||
wexpect LICENSE
|
||||
|
||||
This license is approved by the OSI and FSF as GPL-compatible.
|
||||
http://opensource.org/licenses/isc-license.txt
|
||||
|
||||
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
|
||||
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
|
||||
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
|
||||
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
'''
|
||||
import unittest
|
||||
import sys
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
import wexpect
|
||||
from . import PexpectTestCase
|
||||
|
||||
# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent
|
||||
_CAT_EOF = '^D\x08\x08'
|
||||
|
||||
def _u(s):
|
||||
return s
|
||||
|
||||
PYBIN = '"{}"'.format(sys.executable)
|
||||
|
||||
class TestCaseMisc(PexpectTestCase.PexpectTestCase):
|
||||
|
||||
def test_isatty(self):
|
||||
" Test isatty() is True after spawning process on most platforms. "
|
||||
child = wexpect.spawn('cat')
|
||||
if not child.isatty() and sys.platform.lower().startswith('sunos'):
|
||||
if hasattr(unittest, 'SkipTest'):
|
||||
raise unittest.SkipTest("Not supported on this platform.")
|
||||
return 'skip'
|
||||
assert child.isatty()
|
||||
|
||||
def test_read(self):
|
||||
" Test spawn.read by calls of various size. "
|
||||
child = wexpect.spawn('cat')
|
||||
child.sendline("abc")
|
||||
child.sendeof()
|
||||
child.readline()
|
||||
self.assertEqual(child.read(0), '')
|
||||
self.assertEqual(child.read(1), 'a')
|
||||
self.assertEqual(child.read(1), 'b')
|
||||
self.assertEqual(child.read(1), 'c')
|
||||
self.assertEqual(child.read(2), '\r\n')
|
||||
|
||||
def test_readline_bin_echo(self):
|
||||
" Test spawn('echo'). "
|
||||
# given,
|
||||
child = wexpect.spawn('echo', ['alpha', 'beta'])
|
||||
|
||||
# exercise,
|
||||
self.assertEqual(child.readline(), 'alpha beta\r\n')
|
||||
|
||||
def test_readline(self):
|
||||
" Test spawn.readline(). "
|
||||
# when argument 0 is sent, nothing is returned.
|
||||
# Otherwise the argument value is meaningless.
|
||||
child = wexpect.spawn('cat')
|
||||
child.sendline("alpha")
|
||||
child.sendline("beta")
|
||||
child.sendline("gamma")
|
||||
child.sendline("delta")
|
||||
child.sendeof()
|
||||
self.assertEqual(child.readline(0), '')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline().rstrip(), 'alpha')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline(1).rstrip(), 'beta')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline(2).rstrip(), 'gamma')
|
||||
child.readline().rstrip()
|
||||
self.assertEqual(child.readline().rstrip(), 'delta')
|
||||
child.expect(wexpect.EOF)
|
||||
assert not child.isalive()
|
||||
self.assertEqual(child.exitstatus, 0)
|
||||
|
||||
def test_iter(self):
|
||||
" iterating over lines of spawn.__iter__(). "
|
||||
child = wexpect.spawn('echo "abc\r\n123"')
|
||||
# Don't use ''.join() because we want to test __iter__().
|
||||
page = ''
|
||||
for line in child:
|
||||
page += line
|
||||
page = page.replace(_CAT_EOF, '')
|
||||
self.assertEqual(page, 'abc\r\n123\r\n')
|
||||
|
||||
def test_readlines(self):
|
||||
" reading all lines of spawn.readlines(). "
|
||||
child = wexpect.spawn('cat')
|
||||
child.sendline("abc")
|
||||
child.sendline("123")
|
||||
child.sendeof()
|
||||
page = ''.join(child.readlines()).replace(_CAT_EOF, '')
|
||||
self.assertEqual(page, '\r\nabc\r\n\r\n123\r\n')
|
||||
child.expect(wexpect.EOF)
|
||||
assert not child.isalive()
|
||||
self.assertEqual(child.exitstatus, 0)
|
||||
|
||||
def test_write(self):
|
||||
" write a character and return it in return. "
|
||||
child = wexpect.spawn('cat')
|
||||
child.write('a')
|
||||
child.write('\r')
|
||||
child.readline()
|
||||
self.assertEqual(child.readline(), 'a\r\n')
|
||||
|
||||
def test_writelines(self):
|
||||
" spawn.writelines() "
|
||||
child = wexpect.spawn('cat')
|
||||
# notice that much like file.writelines, we do not delimit by newline
|
||||
# -- it is equivalent to calling write(''.join([args,]))
|
||||
child.writelines(['abc', '123', 'xyz', '\r'])
|
||||
child.sendeof()
|
||||
child.readline()
|
||||
line = child.readline()
|
||||
self.assertEqual(line, 'abc123xyz\r\n')
|
||||
|
||||
def test_eof(self):
|
||||
" call to expect() after EOF is received raises wexpect.EOF "
|
||||
child = wexpect.spawn('cat')
|
||||
child.sendeof()
|
||||
with self.assertRaises(wexpect.EOF):
|
||||
child.expect('the unexpected')
|
||||
|
||||
def test_with(self):
|
||||
"spawn can be used as a context manager"
|
||||
with wexpect.spawn(PYBIN + ' echo_w_prompt.py') as p:
|
||||
p.expect('<in >')
|
||||
p.sendline('alpha')
|
||||
p.expect('<out>alpha')
|
||||
assert p.isalive()
|
||||
|
||||
assert not p.isalive()
|
||||
|
||||
def test_terminate(self):
|
||||
" test force terminate always succeeds (SIGKILL). "
|
||||
child = wexpect.spawn('cat')
|
||||
child.terminate(force=1)
|
||||
assert child.terminated
|
||||
|
||||
def test_bad_arguments_suggest_fdpsawn(self):
|
||||
" assert custom exception for spawn(int). "
|
||||
expect_errmsg = "maybe you want to use fdpexpect.fdspawn"
|
||||
with self.assertRaisesRegex(wexpect.ExceptionPexpect,
|
||||
".*" + expect_errmsg):
|
||||
wexpect.spawn(1)
|
||||
|
||||
def test_bad_arguments_second_arg_is_list(self):
|
||||
" Second argument to spawn, if used, must be only a list."
|
||||
with self.assertRaises(TypeError):
|
||||
wexpect.spawn('ls', '-la')
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
# not even a tuple,
|
||||
wexpect.spawn('ls', ('-la',))
|
||||
|
||||
def test_read_after_close_raises_value_error(self):
|
||||
" Calling read_nonblocking after close raises ValueError. "
|
||||
# as read_nonblocking underlies all other calls to read,
|
||||
# ValueError should be thrown for all forms of read.
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat')
|
||||
p.close()
|
||||
p.read_nonblocking()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat')
|
||||
p.close()
|
||||
p.read()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat')
|
||||
p.close()
|
||||
p.readline()
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
p = wexpect.spawn('cat')
|
||||
p.close()
|
||||
p.readlines()
|
||||
|
||||
def test_isalive(self):
|
||||
" check isalive() before and after EOF. (True, False) "
|
||||
child = wexpect.spawn('cat')
|
||||
assert child.isalive() is True
|
||||
child.sendeof()
|
||||
child.expect(wexpect.EOF)
|
||||
assert child.isalive() is False
|
||||
|
||||
def test_bad_type_in_expect(self):
|
||||
" expect() does not accept dictionary arguments. "
|
||||
child = wexpect.spawn('cat')
|
||||
with self.assertRaises(TypeError):
|
||||
child.expect({})
|
||||
|
||||
def test_cwd(self):
|
||||
" check keyword argument `cwd=' of wexpect.run() "
|
||||
try:
|
||||
os.mkdir('cwd_tmp')
|
||||
except:
|
||||
pass
|
||||
tmp_dir = os.path.realpath('cwd_tmp')
|
||||
child = wexpect.spawn('cmd')
|
||||
child.expect('>')
|
||||
child.sendline('cd')
|
||||
child.expect('>')
|
||||
default = child.before.splitlines()[1]
|
||||
child.terminate()
|
||||
child = wexpect.spawn('cmd', cwd=tmp_dir)
|
||||
child.expect('>')
|
||||
child.sendline('cd')
|
||||
child.expect('>')
|
||||
pwd_tmp = child.before.splitlines()[1]
|
||||
child.terminate()
|
||||
self.assertNotEqual(default, pwd_tmp)
|
||||
self.assertEqual(tmp_dir, _u(pwd_tmp))
|
||||
|
||||
def _test_searcher_as(self, searcher, plus=None):
|
||||
# given,
|
||||
given_words = ['alpha', 'beta', 'gamma', 'delta', ]
|
||||
given_search = given_words
|
||||
if searcher == wexpect.searcher_re:
|
||||
given_search = [re.compile(word) for word in given_words]
|
||||
if plus is not None:
|
||||
given_search = given_search + [plus]
|
||||
search_string = searcher(given_search)
|
||||
basic_fmt = '\n {0}: {1}'
|
||||
fmt = basic_fmt
|
||||
if searcher is wexpect.searcher_re:
|
||||
fmt = '\n {0}: re.compile({1})'
|
||||
expected_output = '{0}:'.format(searcher.__name__)
|
||||
idx = 0
|
||||
for word in given_words:
|
||||
expected_output += fmt.format(idx, '"{0}"'.format(word))
|
||||
idx += 1
|
||||
if plus is not None:
|
||||
if plus == wexpect.EOF:
|
||||
expected_output += basic_fmt.format(idx, 'EOF')
|
||||
elif plus == wexpect.TIMEOUT:
|
||||
expected_output += basic_fmt.format(idx, 'TIMEOUT')
|
||||
|
||||
# exercise,
|
||||
self.assertEqual(search_string.__str__(), expected_output)
|
||||
|
||||
def test_searcher_as_string(self):
|
||||
" check searcher_string(..).__str__() "
|
||||
self._test_searcher_as(wexpect.searcher_string)
|
||||
|
||||
def test_searcher_as_string_with_EOF(self):
|
||||
" check searcher_string(..).__str__() that includes EOF "
|
||||
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.EOF)
|
||||
|
||||
def test_searcher_as_string_with_TIMEOUT(self):
|
||||
" check searcher_string(..).__str__() that includes TIMEOUT "
|
||||
self._test_searcher_as(wexpect.searcher_string, plus=wexpect.TIMEOUT)
|
||||
|
||||
def test_searcher_re_as_string(self):
|
||||
" check searcher_re(..).__str__() "
|
||||
self._test_searcher_as(wexpect.searcher_re)
|
||||
|
||||
def test_searcher_re_as_string_with_EOF(self):
|
||||
" check searcher_re(..).__str__() that includes EOF "
|
||||
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.EOF)
|
||||
|
||||
def test_searcher_re_as_string_with_TIMEOUT(self):
|
||||
" check searcher_re(..).__str__() that includes TIMEOUT "
|
||||
self._test_searcher_as(wexpect.searcher_re, plus=wexpect.TIMEOUT)
|
||||
|
||||
def test_exception_tb(self):
|
||||
" test get_trace() filters away wexpect/__init__.py calls. "
|
||||
p = wexpect.spawn('sleep 1')
|
||||
try:
|
||||
p.expect('BLAH')
|
||||
except wexpect.ExceptionPexpect as e:
|
||||
# get_trace should filter out frames in wexpect's own code
|
||||
tb = e.get_trace()
|
||||
# exercise,
|
||||
assert 'raise ' not in tb
|
||||
assert 'wexpect/__init__.py' not in tb
|
||||
else:
|
||||
assert False, "Should have raised an exception."
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
suite = unittest.makeSuite(TestCaseMisc,'test')
|
16
wexpect.py
16
wexpect.py
@ -162,9 +162,7 @@ class ExceptionPexpect(Exception):
|
||||
|
||||
return str(self.value)
|
||||
|
||||
def get_trace(self): # pragma: no cover
|
||||
warnings.warn(deprecation_warning.format("ExceptionPexpect::get_trace"), DeprecationWarning)
|
||||
|
||||
def get_trace(self):
|
||||
"""This returns an abbreviated stack trace with lines that only concern
|
||||
the caller. In other words, the stack trace inside the Wexpect module
|
||||
is not included. """
|
||||
@ -174,9 +172,7 @@ class ExceptionPexpect(Exception):
|
||||
tblist = traceback.format_list(tblist)
|
||||
return ''.join(tblist)
|
||||
|
||||
def __filter_not_wexpect(self, trace_list_item): # pragma: no cover
|
||||
warnings.warn(deprecation_warning.format("ExceptionPexpect::__filter_not_wexpect"), DeprecationWarning)
|
||||
|
||||
def __filter_not_wexpect(self, trace_list_item):
|
||||
"""This returns True if list item 0 the string 'wexpect.py' in it. """
|
||||
|
||||
if trace_list_item[0].find('wexpect.py') == -1:
|
||||
@ -721,10 +717,16 @@ class spawn_windows ():
|
||||
"""
|
||||
|
||||
result = self.readline()
|
||||
if result == "":
|
||||
if self.after == self.delimiter:
|
||||
raise StopIteration
|
||||
return result
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.terminate()
|
||||
|
||||
def readlines (self, sizehint = -1): # File-like object.
|
||||
|
||||
"""This reads until EOF using readline() and returns a list containing
|
||||
|
Loading…
Reference in New Issue
Block a user