mirror of
https://github.com/clearml/wexpect-venv
synced 2025-01-31 02:46:59 +00:00
160 lines
5.2 KiB
Python
160 lines
5.2 KiB
Python
|
#!/usr/bin/env python
|
||
|
# encoding: utf-8
|
||
|
'''
|
||
|
wexpect LICENSE
|
||
|
|
||
|
This license is approved by the OSI and FSF as GPL-compatible.
|
||
|
http://opensource.org/licenses/isc-license.txt
|
||
|
|
||
|
Copyright (c) 2012, Noah Spurrier <noah@noah.org>
|
||
|
PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
|
||
|
PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
|
||
|
COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
|
||
|
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||
|
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||
|
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||
|
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||
|
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||
|
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||
|
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||
|
|
||
|
'''
|
||
|
import wexpect
|
||
|
import unittest
|
||
|
import subprocess
|
||
|
import tempfile
|
||
|
import sys
|
||
|
import os
|
||
|
import re
|
||
|
from . import PexpectTestCase
|
||
|
|
||
|
unicode_type = str
|
||
|
|
||
|
|
||
|
def timeout_callback(values):
|
||
|
if values["event_count"] > 3:
|
||
|
return 1
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def function_events_callback(values):
|
||
|
try:
|
||
|
previous_echoed = values["child_result_list"][-1]
|
||
|
# lets pick up the line with valuable
|
||
|
previous_echoed = previous_echoed.splitlines()[-3].strip()
|
||
|
if previous_echoed.endswith("reserved."):
|
||
|
return "echo stage-1\r\n"
|
||
|
if previous_echoed.endswith("stage-1"):
|
||
|
return "echo stage-2\r\n"
|
||
|
elif previous_echoed.endswith("stage-2"):
|
||
|
return "echo stage-3\r\n"
|
||
|
elif previous_echoed.endswith("stage-3"):
|
||
|
return "exit\r\n"
|
||
|
else:
|
||
|
raise Exception("Unexpected output {0}".format(previous_echoed))
|
||
|
except IndexError:
|
||
|
return "echo stage-1\r\n"
|
||
|
|
||
|
|
||
|
class RunFuncTestCase(PexpectTestCase.PexpectTestCase):
|
||
|
runfunc = staticmethod(wexpect.run)
|
||
|
cr = '\r'
|
||
|
empty = ''
|
||
|
prep_subprocess_out = staticmethod(lambda x: x)
|
||
|
|
||
|
def test_run_exit(self):
|
||
|
(data, exitstatus) = self.runfunc('python exit1.py', withexitstatus=1)
|
||
|
assert exitstatus == 1, "Exit status of 'python exit1.py' should be 1."
|
||
|
|
||
|
def test_run(self):
|
||
|
the_old_way = subprocess.Popen(
|
||
|
args=['uname', '-m', '-n'],
|
||
|
stdout=subprocess.PIPE
|
||
|
).communicate()[0].rstrip()
|
||
|
|
||
|
(the_new_way, exitstatus) = self.runfunc(
|
||
|
'uname -m -n', withexitstatus=1)
|
||
|
the_new_way = the_new_way.replace(self.cr, self.empty).rstrip()
|
||
|
|
||
|
self.assertEqual(self.prep_subprocess_out(the_old_way).decode('utf-8'), the_new_way)
|
||
|
self.assertEqual(exitstatus, 0)
|
||
|
|
||
|
def test_run_callback(self):
|
||
|
# TODO it seems like this test could block forever if run fails...
|
||
|
events = {wexpect.TIMEOUT: timeout_callback}
|
||
|
self.runfunc("cat", timeout=1, events=events)
|
||
|
|
||
|
def test_run_bad_exitstatus(self):
|
||
|
(the_new_way, exitstatus) = self.runfunc(
|
||
|
'ls -l /najoeufhdnzkxjd', withexitstatus=1)
|
||
|
assert exitstatus != 0
|
||
|
|
||
|
def test_run_event_as_string(self):
|
||
|
re_flags = re.DOTALL | re.MULTILINE
|
||
|
events = {
|
||
|
# second match on 'abc', echo 'def'
|
||
|
re.compile('abc.*>', re_flags): 'echo "def"\r\n',
|
||
|
# final match on 'def': exit
|
||
|
re.compile('def.*>', re_flags): 'exit\r\n',
|
||
|
# first match on 'GO:' prompt, echo 'abc'
|
||
|
re.compile('Microsoft.*>', re_flags): 'echo "abc"\r\n'
|
||
|
}
|
||
|
|
||
|
(data, exitstatus) = wexpect.run(
|
||
|
'cmd',
|
||
|
withexitstatus=True,
|
||
|
events=events,
|
||
|
timeout=5)
|
||
|
assert exitstatus == 0
|
||
|
|
||
|
def test_run_event_as_function(self):
|
||
|
events = {'>': function_events_callback}
|
||
|
|
||
|
(data, exitstatus) = wexpect.run(
|
||
|
'cmd',
|
||
|
withexitstatus=True,
|
||
|
events=events,
|
||
|
timeout=10)
|
||
|
assert exitstatus == 0
|
||
|
|
||
|
# Unsupported
|
||
|
# def test_run_event_as_method(self):
|
||
|
# events = {
|
||
|
# '>': RunFuncTestCase._method_events_callback
|
||
|
# }
|
||
|
#
|
||
|
# (data, exitstatus) = wexpect.run(
|
||
|
# 'cmd',
|
||
|
# withexitstatus=True,
|
||
|
# events=events,
|
||
|
# timeout=10)
|
||
|
# assert exitstatus == 0
|
||
|
|
||
|
def test_run_event_typeerror(self):
|
||
|
events = {'>': -1}
|
||
|
with self.assertRaises(TypeError):
|
||
|
wexpect.run('cmd',
|
||
|
withexitstatus=True,
|
||
|
events=events,
|
||
|
timeout=10)
|
||
|
|
||
|
def _method_events_callback(self, values):
|
||
|
try:
|
||
|
previous_echoed = (values["child_result_list"][-1].decode()
|
||
|
.split("\n")[-2].strip())
|
||
|
if previous_echoed.endswith("foo1"):
|
||
|
return "echo foo2\n"
|
||
|
elif previous_echoed.endswith("foo2"):
|
||
|
return "echo foo3\n"
|
||
|
elif previous_echoed.endswith("foo3"):
|
||
|
return "exit\n"
|
||
|
else:
|
||
|
raise Exception("Unexpected output {0!r}"
|
||
|
.format(previous_echoed))
|
||
|
except IndexError:
|
||
|
return "echo foo1\n"
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
unittest.main()
|