diff --git a/tests/echo_w_prompt.py b/tests/echo_w_prompt.py new file mode 100644 index 0000000..3c80553 --- /dev/null +++ b/tests/echo_w_prompt.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +from __future__ import print_function + +try: + raw_input +except NameError: + raw_input = input + +while True: + try: + a = raw_input('') + except EOFError: + print('') + break + print('', a, sep='') diff --git a/tests/test_misc.py b/tests/test_misc.py new file mode 100644 index 0000000..8f5d4f9 --- /dev/null +++ b/tests/test_misc.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python +''' +wexpect LICENSE + + This license is approved by the OSI and FSF as GPL-compatible. + http://opensource.org/licenses/isc-license.txt + + Copyright (c) 2012, Noah Spurrier + PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY + PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE + COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +''' +import unittest +import sys +import re +import signal +import time +import tempfile +import os + +import wexpect +from . import PexpectTestCase + +# the program cat(1) may display ^D\x08\x08 when \x04 (EOF, Ctrl-D) is sent +_CAT_EOF = '^D\x08\x08' + +def _u(s): + return s + +PYBIN = '"{}"'.format(sys.executable) + +class TestCaseMisc(PexpectTestCase.PexpectTestCase): + + def test_isatty(self): + " Test isatty() is True after spawning process on most platforms. " + child = wexpect.spawn('cat') + if not child.isatty() and sys.platform.lower().startswith('sunos'): + if hasattr(unittest, 'SkipTest'): + raise unittest.SkipTest("Not supported on this platform.") + return 'skip' + assert child.isatty() + + def test_read(self): + " Test spawn.read by calls of various size. " + child = wexpect.spawn('cat') + child.sendline("abc") + child.sendeof() + child.readline() + self.assertEqual(child.read(0), '') + self.assertEqual(child.read(1), 'a') + self.assertEqual(child.read(1), 'b') + self.assertEqual(child.read(1), 'c') + self.assertEqual(child.read(2), '\r\n') + + def test_readline_bin_echo(self): + " Test spawn('echo'). " + # given, + child = wexpect.spawn('echo', ['alpha', 'beta']) + + # exercise, + self.assertEqual(child.readline(), 'alpha beta\r\n') + + def test_readline(self): + " Test spawn.readline(). " + # when argument 0 is sent, nothing is returned. + # Otherwise the argument value is meaningless. + child = wexpect.spawn('cat') + child.sendline("alpha") + child.sendline("beta") + child.sendline("gamma") + child.sendline("delta") + child.sendeof() + self.assertEqual(child.readline(0), '') + child.readline().rstrip() + self.assertEqual(child.readline().rstrip(), 'alpha') + child.readline().rstrip() + self.assertEqual(child.readline(1).rstrip(), 'beta') + child.readline().rstrip() + self.assertEqual(child.readline(2).rstrip(), 'gamma') + child.readline().rstrip() + self.assertEqual(child.readline().rstrip(), 'delta') + child.expect(wexpect.EOF) + assert not child.isalive() + self.assertEqual(child.exitstatus, 0) + + def test_iter(self): + " iterating over lines of spawn.__iter__(). " + child = wexpect.spawn('echo "abc\r\n123"') + # Don't use ''.join() because we want to test __iter__(). + page = '' + for line in child: + page += line + page = page.replace(_CAT_EOF, '') + self.assertEqual(page, 'abc\r\n123\r\n') + + def test_readlines(self): + " reading all lines of spawn.readlines(). " + child = wexpect.spawn('cat') + child.sendline("abc") + child.sendline("123") + child.sendeof() + page = ''.join(child.readlines()).replace(_CAT_EOF, '') + self.assertEqual(page, '\r\nabc\r\n\r\n123\r\n') + child.expect(wexpect.EOF) + assert not child.isalive() + self.assertEqual(child.exitstatus, 0) + + def test_write(self): + " write a character and return it in return. " + child = wexpect.spawn('cat') + child.write('a') + child.write('\r') + child.readline() + self.assertEqual(child.readline(), 'a\r\n') + + def test_writelines(self): + " spawn.writelines() " + child = wexpect.spawn('cat') + # notice that much like file.writelines, we do not delimit by newline + # -- it is equivalent to calling write(''.join([args,])) + child.writelines(['abc', '123', 'xyz', '\r']) + child.sendeof() + child.readline() + line = child.readline() + self.assertEqual(line, 'abc123xyz\r\n') + + def test_eof(self): + " call to expect() after EOF is received raises wexpect.EOF " + child = wexpect.spawn('cat') + child.sendeof() + with self.assertRaises(wexpect.EOF): + child.expect('the unexpected') + + def test_with(self): + "spawn can be used as a context manager" + with wexpect.spawn(PYBIN + ' echo_w_prompt.py') as p: + p.expect('') + p.sendline('alpha') + p.expect('alpha') + assert p.isalive() + + assert not p.isalive() + + def test_terminate(self): + " test force terminate always succeeds (SIGKILL). " + child = wexpect.spawn('cat') + child.terminate(force=1) + assert child.terminated + + def test_bad_arguments_suggest_fdpsawn(self): + " assert custom exception for spawn(int). " + expect_errmsg = "maybe you want to use fdpexpect.fdspawn" + with self.assertRaisesRegex(wexpect.ExceptionPexpect, + ".*" + expect_errmsg): + wexpect.spawn(1) + + def test_bad_arguments_second_arg_is_list(self): + " Second argument to spawn, if used, must be only a list." + with self.assertRaises(TypeError): + wexpect.spawn('ls', '-la') + + with self.assertRaises(TypeError): + # not even a tuple, + wexpect.spawn('ls', ('-la',)) + + def test_read_after_close_raises_value_error(self): + " Calling read_nonblocking after close raises ValueError. " + # as read_nonblocking underlies all other calls to read, + # ValueError should be thrown for all forms of read. + with self.assertRaises(ValueError): + p = wexpect.spawn('cat') + p.close() + p.read_nonblocking() + + with self.assertRaises(ValueError): + p = wexpect.spawn('cat') + p.close() + p.read() + + with self.assertRaises(ValueError): + p = wexpect.spawn('cat') + p.close() + p.readline() + + with self.assertRaises(ValueError): + p = wexpect.spawn('cat') + p.close() + p.readlines() + + def test_isalive(self): + " check isalive() before and after EOF. (True, False) " + child = wexpect.spawn('cat') + assert child.isalive() is True + child.sendeof() + child.expect(wexpect.EOF) + assert child.isalive() is False + + def test_bad_type_in_expect(self): + " expect() does not accept dictionary arguments. " + child = wexpect.spawn('cat') + with self.assertRaises(TypeError): + child.expect({}) + + def test_cwd(self): + " check keyword argument `cwd=' of wexpect.run() " + try: + os.mkdir('cwd_tmp') + except: + pass + tmp_dir = os.path.realpath('cwd_tmp') + child = wexpect.spawn('cmd') + child.expect('>') + child.sendline('cd') + child.expect('>') + default = child.before.splitlines()[1] + child.terminate() + child = wexpect.spawn('cmd', cwd=tmp_dir) + child.expect('>') + child.sendline('cd') + child.expect('>') + pwd_tmp = child.before.splitlines()[1] + child.terminate() + self.assertNotEqual(default, pwd_tmp) + self.assertEqual(tmp_dir, _u(pwd_tmp)) + + def _test_searcher_as(self, searcher, plus=None): + # given, + given_words = ['alpha', 'beta', 'gamma', 'delta', ] + given_search = given_words + if searcher == wexpect.searcher_re: + given_search = [re.compile(word) for word in given_words] + if plus is not None: + given_search = given_search + [plus] + search_string = searcher(given_search) + basic_fmt = '\n {0}: {1}' + fmt = basic_fmt + if searcher is wexpect.searcher_re: + fmt = '\n {0}: re.compile({1})' + expected_output = '{0}:'.format(searcher.__name__) + idx = 0 + for word in given_words: + expected_output += fmt.format(idx, '"{0}"'.format(word)) + idx += 1 + if plus is not None: + if plus == wexpect.EOF: + expected_output += basic_fmt.format(idx, 'EOF') + elif plus == wexpect.TIMEOUT: + expected_output += basic_fmt.format(idx, 'TIMEOUT') + + # exercise, + self.assertEqual(search_string.__str__(), expected_output) + + def test_searcher_as_string(self): + " check searcher_string(..).__str__() " + self._test_searcher_as(wexpect.searcher_string) + + def test_searcher_as_string_with_EOF(self): + " check searcher_string(..).__str__() that includes EOF " + self._test_searcher_as(wexpect.searcher_string, plus=wexpect.EOF) + + def test_searcher_as_string_with_TIMEOUT(self): + " check searcher_string(..).__str__() that includes TIMEOUT " + self._test_searcher_as(wexpect.searcher_string, plus=wexpect.TIMEOUT) + + def test_searcher_re_as_string(self): + " check searcher_re(..).__str__() " + self._test_searcher_as(wexpect.searcher_re) + + def test_searcher_re_as_string_with_EOF(self): + " check searcher_re(..).__str__() that includes EOF " + self._test_searcher_as(wexpect.searcher_re, plus=wexpect.EOF) + + def test_searcher_re_as_string_with_TIMEOUT(self): + " check searcher_re(..).__str__() that includes TIMEOUT " + self._test_searcher_as(wexpect.searcher_re, plus=wexpect.TIMEOUT) + + def test_exception_tb(self): + " test get_trace() filters away wexpect/__init__.py calls. " + p = wexpect.spawn('sleep 1') + try: + p.expect('BLAH') + except wexpect.ExceptionPexpect as e: + # get_trace should filter out frames in wexpect's own code + tb = e.get_trace() + # exercise, + assert 'raise ' not in tb + assert 'wexpect/__init__.py' not in tb + else: + assert False, "Should have raised an exception." + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(TestCaseMisc,'test') diff --git a/wexpect.py b/wexpect.py index aaa8bbf..279a9f3 100644 --- a/wexpect.py +++ b/wexpect.py @@ -162,9 +162,7 @@ class ExceptionPexpect(Exception): return str(self.value) - def get_trace(self): # pragma: no cover - warnings.warn(deprecation_warning.format("ExceptionPexpect::get_trace"), DeprecationWarning) - + def get_trace(self): """This returns an abbreviated stack trace with lines that only concern the caller. In other words, the stack trace inside the Wexpect module is not included. """ @@ -174,9 +172,7 @@ class ExceptionPexpect(Exception): tblist = traceback.format_list(tblist) return ''.join(tblist) - def __filter_not_wexpect(self, trace_list_item): # pragma: no cover - warnings.warn(deprecation_warning.format("ExceptionPexpect::__filter_not_wexpect"), DeprecationWarning) - + def __filter_not_wexpect(self, trace_list_item): """This returns True if list item 0 the string 'wexpect.py' in it. """ if trace_list_item[0].find('wexpect.py') == -1: @@ -721,10 +717,16 @@ class spawn_windows (): """ result = self.readline() - if result == "": + if self.after == self.delimiter: raise StopIteration return result + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + def readlines (self, sizehint = -1): # File-like object. """This reads until EOF using readline() and returns a list containing