mirror of
https://github.com/clearml/wexpect-venv
synced 2025-01-30 18:36:57 +00:00
[FIX] timeout None for blocking applications; [ADD] argument type assertion in expect_exact
This commit is contained in:
parent
ca81648eb0
commit
888209d2ad
2
tests/list100.py
Normal file
2
tests/list100.py
Normal file
@ -0,0 +1,2 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
print(list(range(100)))
|
396
tests/test_expect.py
Normal file
396
tests/test_expect.py
Normal file
@ -0,0 +1,396 @@
|
||||
#!/usr/bin/env python
|
||||
'''
|
||||
wexpect LICENSE
|
||||
|
||||
This license is approved by the OSI and FSF as GPL-compatible.
|
||||
http://opensource.org/licenses/isc-license.txt
|
||||
|
||||
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
|
||||
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
|
||||
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
|
||||
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
'''
|
||||
import multiprocessing
|
||||
import unittest
|
||||
import subprocess
|
||||
import time
|
||||
import signal
|
||||
import sys
|
||||
import os
|
||||
|
||||
import wexpect
|
||||
from . import PexpectTestCase
|
||||
from .utils import no_coverage_env
|
||||
|
||||
# Many of these test cases blindly assume that sequential directory
|
||||
# listings of the /bin directory will yield the same results.
|
||||
# This may not be true, but seems adequate for testing now.
|
||||
# I should fix this at some point.
|
||||
|
||||
PYTHONBINQUOTE = '"{}"'.format(sys.executable)
|
||||
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
|
||||
def hex_dump(src, length=16):
|
||||
result=[]
|
||||
for i in range(0, len(src), length):
|
||||
s = src[i:i+length]
|
||||
hexa = ' '.join(["%02X"%ord(x) for x in s])
|
||||
printable = s.translate(FILTER)
|
||||
result.append("%04X %-*s %s\n" % (i, length*3, hexa, printable))
|
||||
return ''.join(result)
|
||||
|
||||
def hex_diff(left, right):
|
||||
diff = ['< %s\n> %s' % (_left, _right,) for _left, _right in zip(
|
||||
hex_dump(left).splitlines(), hex_dump(right).splitlines())
|
||||
if _left != _right]
|
||||
return '\n' + '\n'.join(diff,)
|
||||
|
||||
|
||||
class ExpectTestCase (PexpectTestCase.PexpectTestCase):
|
||||
|
||||
def test_expect_basic (self):
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.sendline ('Hello')
|
||||
p.sendline ('there')
|
||||
p.sendline ('Mr. Python')
|
||||
p.expect ('Hello')
|
||||
p.expect ('there')
|
||||
p.expect ('Mr. Python')
|
||||
p.sendeof ()
|
||||
p.expect (wexpect.EOF)
|
||||
|
||||
def test_expect_exact_basic (self):
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.sendline ('Hello')
|
||||
p.sendline ('there')
|
||||
p.sendline ('Mr. Python')
|
||||
p.expect_exact ('Hello')
|
||||
p.expect_exact ('there')
|
||||
p.expect_exact ('Mr. Python')
|
||||
p.sendeof ()
|
||||
p.expect_exact (wexpect.EOF)
|
||||
|
||||
def test_expect_ignore_case(self):
|
||||
'''This test that the ignorecase flag will match patterns
|
||||
even if case is different using the regex (?i) directive.
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.sendline ('HELLO')
|
||||
p.sendline ('there')
|
||||
p.expect ('(?i)hello')
|
||||
p.expect ('(?i)THERE')
|
||||
p.sendeof ()
|
||||
p.expect (wexpect.EOF)
|
||||
|
||||
def test_expect_ignore_case_flag(self):
|
||||
'''This test that the ignorecase flag will match patterns
|
||||
even if case is different using the ignorecase flag.
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.ignorecase = True
|
||||
p.sendline ('HELLO')
|
||||
p.sendline ('there')
|
||||
p.expect ('hello')
|
||||
p.expect ('THERE')
|
||||
p.sendeof ()
|
||||
p.expect (wexpect.EOF)
|
||||
|
||||
def test_expect_order (self):
|
||||
'''This tests that patterns are matched in the same order as given in the pattern_list.
|
||||
|
||||
(Or does it? Doesn't it also pass if expect() always chooses
|
||||
(one of the) the leftmost matches in the input? -- grahn)
|
||||
... agreed! -jquast, the buffer ptr isn't forwarded on match, see first two test cases
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
self._expect_order(p)
|
||||
|
||||
def test_expect_order_exact (self):
|
||||
'''Like test_expect_order(), but using expect_exact().
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.expect = p.expect_exact
|
||||
self._expect_order(p)
|
||||
|
||||
def _expect_order (self, p):
|
||||
p.sendline ('1234')
|
||||
p.sendline ('abcd')
|
||||
p.sendline ('wxyz')
|
||||
p.sendline ('7890')
|
||||
p.sendeof ()
|
||||
index = p.expect ([
|
||||
'1234',
|
||||
'abcd',
|
||||
'wxyz',
|
||||
wexpect.EOF,
|
||||
'7890' ])
|
||||
self.assertEqual(index, 0, (index, p.before, p.after))
|
||||
index = p.expect ([
|
||||
'54321',
|
||||
wexpect.TIMEOUT,
|
||||
'1234',
|
||||
'abcd',
|
||||
'wxyz',
|
||||
wexpect.EOF], timeout=5)
|
||||
self.assertEqual(index, 3, (index, p.before, p.after))
|
||||
index = p.expect ([
|
||||
'54321',
|
||||
wexpect.TIMEOUT,
|
||||
'1234',
|
||||
'abcd',
|
||||
'wxyz',
|
||||
wexpect.EOF], timeout=5)
|
||||
self.assertEqual(index, 4, (index, p.before, p.after))
|
||||
index = p.expect ([
|
||||
wexpect.EOF,
|
||||
'abcd',
|
||||
'wxyz',
|
||||
'7890' ])
|
||||
self.assertEqual(index, 3, (index, p.before, p.after))
|
||||
|
||||
index = p.expect ([
|
||||
'abcd',
|
||||
'wxyz',
|
||||
'7890',
|
||||
wexpect.EOF])
|
||||
self.assertEqual(index, 3, (index, p.before, p.after))
|
||||
|
||||
def test_expect_index (self):
|
||||
'''This tests that mixed list of regex strings, TIMEOUT, and EOF all
|
||||
return the correct index when matched.
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
self._expect_index(p)
|
||||
|
||||
def test_expect_index_exact (self):
|
||||
'''Like test_expect_index(), but using expect_exact().
|
||||
'''
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.expect = p.expect_exact
|
||||
self._expect_index(p)
|
||||
|
||||
def _expect_index (self, p):
|
||||
p.sendline ('1234')
|
||||
index = p.expect (['abcd','wxyz','1234',wexpect.EOF])
|
||||
self.assertEqual(index, 2, "index="+str(index))
|
||||
p.sendline ('abcd')
|
||||
index = p.expect ([wexpect.TIMEOUT,'abcd','wxyz','1234',wexpect.EOF])
|
||||
self.assertEqual(index, 1, "index="+str(index)+str(p))
|
||||
p.sendline ('wxyz')
|
||||
index = p.expect (['54321',wexpect.TIMEOUT,'abcd','wxyz','1234',wexpect.EOF])
|
||||
self.assertEqual(index, 3, "index="+str(index)) # Expect 'wxyz'
|
||||
p.sendline ('$*!@?')
|
||||
index = p.expect (['54321',wexpect.TIMEOUT,'abcd','wxyz','1234',wexpect.EOF],
|
||||
timeout=1)
|
||||
self.assertEqual(index, 1, "index="+str(index)) # Expect TIMEOUT
|
||||
p.sendeof ()
|
||||
index = p.expect (['54321',wexpect.TIMEOUT,'abcd','wxyz','1234',wexpect.EOF])
|
||||
self.assertEqual(index, 5, "index="+str(index)) # Expect EOF
|
||||
|
||||
def test_expect (self):
|
||||
the_old_way = subprocess.Popen(args=['ls', '-l'],
|
||||
stdout=subprocess.PIPE).communicate()[0].rstrip()
|
||||
p = wexpect.spawn('ls -l')
|
||||
the_new_way = ''
|
||||
while 1:
|
||||
i = p.expect (['\n', wexpect.EOF])
|
||||
the_new_way = the_new_way + p.before
|
||||
if i == 1:
|
||||
break
|
||||
the_new_way = the_new_way.rstrip()
|
||||
the_new_way = the_new_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
the_old_way = the_old_way.decode('utf-8')
|
||||
the_old_way = the_old_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
# print(repr(the_old_way))
|
||||
# print(repr(the_new_way))
|
||||
# the_old_way = the_old_way[0:10000]
|
||||
# the_new_way = the_new_way[0:10000]
|
||||
self.assertEqual(the_old_way, the_new_way)
|
||||
|
||||
def test_expect_exact (self):
|
||||
the_old_way = subprocess.Popen(args=['ls', '-l'],
|
||||
stdout=subprocess.PIPE).communicate()[0].rstrip()
|
||||
p = wexpect.spawn('ls -l')
|
||||
the_new_way = ''
|
||||
while 1:
|
||||
i = p.expect_exact (['\n', wexpect.EOF])
|
||||
the_new_way = the_new_way + p.before
|
||||
if i == 1:
|
||||
break
|
||||
the_new_way = the_new_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
the_old_way = the_old_way.decode('utf-8')
|
||||
the_old_way = the_old_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
self.assertEqual(the_old_way, the_new_way)
|
||||
p = wexpect.spawn('echo hello.?world')
|
||||
i = p.expect_exact('.?')
|
||||
self.assertEqual(p.before, 'hello')
|
||||
self.assertEqual(p.after, '.?')
|
||||
|
||||
def test_expect_eof (self):
|
||||
the_old_way = subprocess.Popen(args=['ls', '-l'],
|
||||
stdout=subprocess.PIPE).communicate()[0].rstrip()
|
||||
p = wexpect.spawn('ls -l')
|
||||
p.expect(wexpect.EOF) # This basically tells it to read everything. Same as wexpect.run() function.
|
||||
the_new_way = p.before
|
||||
the_new_way = the_new_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
the_old_way = the_old_way.decode('utf-8')
|
||||
the_old_way = the_old_way.replace('\r\n', '\n'
|
||||
).replace('\r', '\n').replace('\n\n', '\n').rstrip()
|
||||
self.assertEqual(the_old_way, the_new_way)
|
||||
|
||||
def test_expect_timeout (self):
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.expect(wexpect.TIMEOUT) # This tells it to wait for timeout.
|
||||
self.assertEqual(p.after, wexpect.TIMEOUT)
|
||||
|
||||
def test_unexpected_eof (self):
|
||||
p = wexpect.spawn('ls -l /bin')
|
||||
try:
|
||||
p.expect('_Z_XY_XZ') # Probably never see this in ls output.
|
||||
except wexpect.EOF:
|
||||
pass
|
||||
else:
|
||||
self.fail ('Expected an EOF exception.')
|
||||
|
||||
def test_buffer_interface(self):
|
||||
p = wexpect.spawn('cat', timeout=5)
|
||||
p.sendline ('Hello')
|
||||
p.expect ('Hello')
|
||||
assert len(p.buffer)
|
||||
p.buffer = 'Testing'
|
||||
p.sendeof ()
|
||||
|
||||
def _before_after(self, p):
|
||||
p.timeout = 5
|
||||
|
||||
p.expect('5')
|
||||
self.assertEqual(p.after, '5')
|
||||
self.assertTrue(p.before.startswith('[0, 1, 2'), p.before)
|
||||
|
||||
p.expect('50')
|
||||
self.assertEqual(p.after, '50')
|
||||
self.assertTrue(p.before.startswith(', 6, 7, 8'), p.before[:20])
|
||||
self.assertTrue(p.before.endswith('48, 49, '), p.before[-20:])
|
||||
|
||||
p.expect(wexpect.EOF)
|
||||
self.assertEqual(p.after, wexpect.EOF)
|
||||
assert p.before.startswith(', 51, 52'), p.before[:20]
|
||||
assert p.before.endswith(', 99]\r\n'), p.before[-20:]
|
||||
|
||||
def test_before_after(self):
|
||||
'''This tests expect() for some simple before/after things.
|
||||
'''
|
||||
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE, env=no_coverage_env())
|
||||
self._before_after(p)
|
||||
|
||||
def test_before_after_exact(self):
|
||||
'''This tests some simple before/after things, for
|
||||
expect_exact(). (Grahn broke it at one point.)
|
||||
'''
|
||||
p = wexpect.spawn('%s -Wi list100.py' % PYTHONBINQUOTE, env=no_coverage_env())
|
||||
# mangle the spawn so we test expect_exact() instead
|
||||
p.expect = p.expect_exact
|
||||
self._before_after(p)
|
||||
|
||||
def _ordering(self, p):
|
||||
p.timeout = 20
|
||||
p.expect('>>> ')
|
||||
|
||||
p.sendline('list(range(4*3))')
|
||||
self.assertEqual(p.expect(['5,', '5,']), 0)
|
||||
p.expect('>>> ')
|
||||
|
||||
p.sendline('list(range(4*3))')
|
||||
self.assertEqual(p.expect(['7,', '5,']), 1)
|
||||
p.expect('>>> ')
|
||||
|
||||
p.sendline('list(range(4*3))')
|
||||
self.assertEqual(p.expect(['5,', '7,']), 0)
|
||||
p.expect('>>> ')
|
||||
|
||||
p.sendline('list(range(4*5))')
|
||||
self.assertEqual(p.expect(['2,', '12,']), 0)
|
||||
p.expect('>>> ')
|
||||
|
||||
p.sendline('list(range(4*5))')
|
||||
self.assertEqual(p.expect(['12,', '2,']), 1)
|
||||
|
||||
def test_ordering(self):
|
||||
'''This tests expect() for which pattern is returned
|
||||
when many may eventually match. I (Grahn) am a bit
|
||||
confused about what should happen, but this test passes
|
||||
with wexpect 2.1.
|
||||
'''
|
||||
p = wexpect.spawn(PYTHONBINQUOTE)
|
||||
self._ordering(p)
|
||||
|
||||
def test_ordering_exact(self):
|
||||
'''This tests expect_exact() for which pattern is returned
|
||||
when many may eventually match. I (Grahn) am a bit
|
||||
confused about what should happen, but this test passes
|
||||
for the expect() method with wexpect 2.1.
|
||||
'''
|
||||
p = wexpect.spawn(PYTHONBINQUOTE)
|
||||
# mangle the spawn so we test expect_exact() instead
|
||||
p.expect = p.expect_exact
|
||||
self._ordering(p)
|
||||
|
||||
def _greed(self, expect):
|
||||
# End at the same point: the one with the earliest start should win
|
||||
self.assertEqual(expect(['3, 4', '2, 3, 4']), 1)
|
||||
|
||||
# Start at the same point: first pattern passed wins
|
||||
self.assertEqual(expect(['5,', '5, 6']), 0)
|
||||
|
||||
# Same pattern passed twice: first instance wins
|
||||
self.assertEqual(expect(['7, 8', '7, 8, 9', '7, 8']), 0)
|
||||
|
||||
def _greed_read1(self, expect):
|
||||
# Here, one has an earlier start and a later end. When processing
|
||||
# one character at a time, the one that finishes first should win,
|
||||
# because we don't know about the other match when it wins.
|
||||
# If maxread > 1, this behaviour is currently undefined, although in
|
||||
# most cases the one that starts first will win.
|
||||
self.assertEqual(expect(['1, 2, 3', '2,']), 1)
|
||||
|
||||
def test_greed(self):
|
||||
p = wexpect.spawn(PYTHONBINQUOTE + ' list100.py')
|
||||
self._greed(p.expect)
|
||||
|
||||
def test_greed_exact(self):
|
||||
p = wexpect.spawn(PYTHONBINQUOTE + ' list100.py')
|
||||
self._greed(p.expect_exact)
|
||||
|
||||
def test_bad_arg(self):
|
||||
p = wexpect.spawn('cat')
|
||||
with self.assertRaisesRegex(TypeError, '.*must be one of'):
|
||||
p.expect(1)
|
||||
with self.assertRaisesRegex(TypeError, '.*must be one of'):
|
||||
p.expect([1, '2'])
|
||||
with self.assertRaisesRegex(TypeError, '.*must be one of'):
|
||||
p.expect_exact(1)
|
||||
with self.assertRaisesRegex(TypeError, '.*must be one of'):
|
||||
p.expect_exact([1, '2'])
|
||||
|
||||
def test_timeout_none(self):
|
||||
p = wexpect.spawn('echo abcdef', timeout=None)
|
||||
p.expect('abc')
|
||||
p.expect_exact('def')
|
||||
p.expect(wexpect.EOF)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
suite = unittest.makeSuite(ExpectTestCase, 'test')
|
7
tests/utils.py
Normal file
7
tests/utils.py
Normal file
@ -0,0 +1,7 @@
|
||||
import os
|
||||
|
||||
def no_coverage_env():
|
||||
"Return a copy of os.environ that won't trigger coverage measurement."
|
||||
env = os.environ.copy()
|
||||
env.pop('COV_CORE_SOURCE', None)
|
||||
return env
|
@ -1093,8 +1093,13 @@ class spawn_windows ():
|
||||
This method is also useful when you don't want to have to worry about
|
||||
escaping regular expression characters that you want to match."""
|
||||
|
||||
if type(pattern_list) in (str,) or pattern_list in (TIMEOUT, EOF):
|
||||
if not isinstance(pattern_list, list):
|
||||
pattern_list = [pattern_list]
|
||||
|
||||
for p in pattern_list:
|
||||
if type(p) not in (str,) and p not in (TIMEOUT, EOF):
|
||||
raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p)))
|
||||
|
||||
return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
|
||||
|
||||
def expect_loop(self, searcher, timeout = -1, searchwindowsize = -1):
|
||||
@ -1127,7 +1132,7 @@ class spawn_windows ():
|
||||
self.match_index = index
|
||||
return self.match_index
|
||||
# No match at this point
|
||||
if timeout < 0 and timeout is not None:
|
||||
if timeout is not None and timeout < 0:
|
||||
raise TIMEOUT ('Timeout exceeded in expect_any().')
|
||||
# Still have time left, so read more data
|
||||
c = self.read_nonblocking(self.maxread, timeout)
|
||||
|
Loading…
Reference in New Issue
Block a user