[MRG] #10 merged to test

This commit is contained in:
Benedek Racz 2019-11-04 14:52:39 +01:00
commit 3484a98cdc
8 changed files with 178 additions and 75 deletions

View File

@ -41,6 +41,19 @@ child.sendline('exit')
For more information see [examples](./examples) folder.
## Code Clean up!
Wexpect works only on Windows platforms. There are handy tools for other platforms. Therefore I will
remove any non-windows code. If you see following warning in your console please contact me to
prevent the removal of that function.
```
################################## WARNING ##################################
<some func> is deprecated, and will be removed soon.
Please contact me and report it at github.com/raczben/wexpect if you use it.
################################## WARNING ##################################
```
---
## What is it?

View File

@ -11,7 +11,6 @@ environment:
testpypipw:
secure: CcyBI8e/2LdIT2aYIytTAgR4795DNBDM/ztsz1kqZYYOeNc3zlJWLdYWrnjCHn5W6/ZcAHrsxCdCMHvtr6PIVgBRpl2RR3fk2jKTzKqJJsLW871q30BsE0kws32f1IiqfjVtLn8BUC91IJ2xBBXtOYktf1tCMi3zJMSF9+MIOQKIu298bIRnD1Lc+4lzcSZJOn4I7dOMdzlcCMRqhtO58TGwR/hD+22FHjyWVB8nLL18AO+XXS9lHSOUrH6rD5NYvVFZD68oV/RrCGAjRmfMnw==
build: off
install:
@ -39,9 +38,18 @@ after_test:
# Create source distribution.
- python -m setup sdist
# Upload to test pypi
- twine upload -r testpypi dist\\wexpect*.tar.gz
# Upload to pypi.
# More precisely. The master and release builds will be uploaded to pypi. Test branch will be
# uploaded to test-pypi the test builds.
# See more at https://stackoverflow.com/a/39155147/2506522
# Upload to offitial pypi
- twine upload -r pypi dist\\wexpect*.tar.gz
- IF %APPVEYOR_REPO_BRANCH%==master (
twine upload -r pypi dist\\wexpect*.tar.gz
)
- IF %APPVEYOR_REPO_TAG%==true (
twine upload -r pypi dist\\wexpect*.tar.gz
)
- IF %APPVEYOR_REPO_BRANCH%==test (
twine upload -r testpypi dist\\wexpect*.tar.gz
)

4
codecov.yml Normal file
View File

@ -0,0 +1,4 @@
# Settings for https://codecov.io/gh/raczben/wexpect/
ignore:
- tests/* # ignore folders and all its contents

View File

@ -1,36 +0,0 @@
# A simple example code for wexpect
from __future__ import print_function
import sys
import os
here = os.path.dirname(os.path.abspath(__file__))
wexpectPath = os.path.dirname(here)
#sys.path.insert(0, wexpectPath)
import wexpect
# Path of cmd executable:
cmdPathes = ['C:\Windows\System32\cmd.exe', 'cmd.exe', 'cmd']
cmdPrompt = '>'
for cmdPath in cmdPathes:
# Start the child process
p = wexpect.spawn(cmdPath)
# Wait for prompt
p.expect(cmdPrompt)
# print the texts
print(p.before, end='')
print(p.match.group(0), end='')
# Send a command
p.sendline('ls')
p.expect(cmdPrompt)
# print the texts
print(p.before, end='')
print(p.match.group(0), end='')

15
tests/lines_printer.py Normal file
View File

@ -0,0 +1,15 @@
'''
This is is a very basic stdio handler script. This is used by python.py example.
'''
import time
# Read an integer from the user:
print('Give a small integer: ', end='')
num = input()
# Wait the given time
for i in range(int(num)):
print('waiter ' + str(i))
time.sleep(0.2)
print('Bye!')

View File

@ -33,6 +33,10 @@ class TestCaseConstructor(PexpectTestCase.PexpectTestCase):
p2 = wexpect.spawn('uname', ['-m', '-n', '-p', '-r', '-s', '-v'])
p2.expect(wexpect.EOF)
self.assertEqual(p1.before, p2.before)
self.assertEqual(str(p1).splitlines()[1:9], str(p2).splitlines()[1:9])
run_result = wexpect.run('uname -m -n -p -r -s -v')
self.assertEqual(run_result, p2.before)
def test_named_parameters (self):
'''This tests that named parameters work.

83
tests/test_readline.py Normal file
View File

@ -0,0 +1,83 @@
#!/usr/bin/env python
'''
MIT License
Copyright (c) 2008 Noah Spurrier, Richard Holden, Marco Molteni, Kimberley Burchett, Robert Stone,
Hartmut Goebel, Chad Schroeder, Erick Tryzelaar, Dave Kirby, Ids vander Molen, George Todd,
Noel Taylor, Nicolas D. Cesar, Alexander Gattin, Geoffrey Marshall, Francisco Lourenco, Glen Mabey,
Karthik Gurusamy, Fernando Perez, Corey Minyard, Jon Cohen, Guillaume Chazarain, Andrew Ryan,
Nick Craig-Wood, Andrew Stone, Jorgen Grahn, Benedek Racz
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
import wexpect
import time
import sys
import os
import unittest
from . import PexpectTestCase
here = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, here)
print(wexpect.__version__)
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
python_executable = '"' + sys.executable + '" '
child_script = here + '\\lines_printer.py'
class ReadLineTestCase(PexpectTestCase.PexpectTestCase):
def testReadline(self):
fooPath = python_executable + ' ' + child_script
prompt = ': '
num = 5
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
p.expect(prompt)
p.sendline(str(num))
p.expect('Bye!\r\n')
expected_lines = p.before.splitlines(True) # Keep the line end
expected_lines += [p.match.group()]
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
p.expect(prompt)
p.sendline(str(num))
for i in range(num +2): # +1 the line of sendline +1: Bye
line = p.readline()
self.assertEqual(expected_lines[i], line)
# Start the child process
p = wexpect.spawn(fooPath)
# Wait for prompt
p.expect(prompt)
p.sendline(str(num))
readlines_lines = p.readlines()
self.assertEqual(expected_lines, readlines_lines)
if __name__ == '__main__':
unittest.main()
suite = unittest.makeSuite(ReadLineTestCase,'test')

View File

@ -75,6 +75,7 @@ Pexpect is intended for UNIX-like operating systems.""")
#
# Import built in modules
#
import warnings
import logging
import os
import time
@ -110,6 +111,13 @@ except ImportError as e:
screenbufferfillchar = '\4'
maxconsoleY = 8000
warnings.simplefilter("always", category=DeprecationWarning)
no_unix_deprecation_warning = '''
################################## WARNING ##################################
{} is deprecated, and will be removed soon.
Please contact me and report it at github.com/raczben/wexpect if you use it.
################################## WARNING ##################################
'''
# The version is handled by the package: pbr, which derives the version from the git tags.
try:
@ -534,8 +542,10 @@ class spawn_unix (object):
s.append('delayafterterminate: ' + str(self.delayafterterminate))
return '\n'.join(s)
def _spawn(self,command,args=[]):
def _spawn(self,command,args=[]): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::_spawn"), DeprecationWarning)
"""This starts the given command in a child process. This does all the
fork/exec type of stuff for a pty. This is called by __init__. If args
is empty then command will be parsed (split on spaces) and args will be
@ -622,7 +632,9 @@ class spawn_unix (object):
self.terminated = False
self.closed = False
def __fork_pty(self):
def __fork_pty(self): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::__fork_pty"), DeprecationWarning)
"""This implements a substitute for the forkpty system call. This
should be more portable than the pty.fork() function. Specifically,
@ -661,7 +673,8 @@ class spawn_unix (object):
return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd):
def __pty_make_controlling_tty(self, tty_fd): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::__pty_make_controlling_tty"), DeprecationWarning)
"""This makes the pseudo-terminal the controlling tty. This should be
more portable than the pty.fork() function. Specifically, this should
@ -726,8 +739,9 @@ class spawn_unix (object):
self.closed = True
#self.pid = None
def flush (self): # File-like object.
def flush (self): # pragma: no cover # File-like object.
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::flush"), DeprecationWarning)
"""This does nothing. It is here to support the interface for a
File-like object. """
@ -824,7 +838,8 @@ class spawn_unix (object):
# and blocked on some platforms. TCSADRAIN is probably ideal if it worked.
termios.tcsetattr(self.child_fd, termios.TCSANOW, attr)
def read_nonblocking (self, size = 1, timeout = -1):
def read_nonblocking (self, size = 1, timeout = -1): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::read_nonblocking"), DeprecationWarning)
"""This reads at most size characters from the child application. It
includes a timeout. If the read does not complete within the timeout
@ -995,7 +1010,9 @@ class spawn_unix (object):
for s in sequence:
self.write (s)
def send(self, s):
def send(self, s): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::send"), DeprecationWarning)
"""This sends a string to the child process. This returns the number of
bytes written. If a log file was set then the data is also written to
@ -1167,7 +1184,8 @@ class spawn_unix (object):
raise ExceptionPexpect ('Wait was called for a child process that is stopped. This is not supported. Is some other process attempting job control with our child pid?')
return self.exitstatus
def isalive(self):
def isalive(self): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::isalive"), DeprecationWarning)
"""This tests if the child process is running or not. This is
non-blocking. If the child was terminated then this will read the
@ -1505,7 +1523,8 @@ class spawn_unix (object):
s = struct.pack('HHHH', r, c, 0, 0)
fcntl.ioctl(self.fileno(), TIOCSWINSZ, s)
def interact(self, escape_character = chr(29), input_filter = None, output_filter = None):
def interact(self, escape_character = chr(29), input_filter = None, output_filter = None): # pragma: no cover
warnings.warn(no_unix_deprecation_warning.format("spawn_unix::interact"), DeprecationWarning)
"""This gives control of the child process to the interactive user (the
human at the keyboard). Keystrokes are sent to the child process, and
@ -2045,22 +2064,22 @@ class Wtty:
win32console.AttachConsole(self.conpid)
self.__consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
self.__consout = self.getConsoleOut()
except Exception as e:
#e = traceback.format_exc()
try:
win32console.AttachConsole(self.__parentPid)
except Exception as ex:
pass
#log(e)
#log(ex)
return
#self.__consin = None
#self.__consout = None
#raise e
except pywintypes.error as e:
# pywintypes.error: (5, 'AttachConsole', 'Access is denied.')
# When child has finished...
logging.info(e)
# In case of any error: We "switch back" (attach) our original console, then raise the
# error.
self.switchBack()
raise EOF('End Of File (EOF) in switchTo().')
except:
# In case of any error: We "switch back" (attach) our original console, then raise the
# error.
self.switchBack()
raise
def switchBack(self):
"""Releases from the current console and attaches
to the parents."""
@ -2157,7 +2176,6 @@ class Wtty:
def readConsole(self, startCo, endCo):
"""Reads the console area from startCo to endCo and returns it
as a string."""
logger.info("STARTED")
buff = []
self.lastRead = 0
@ -2170,20 +2188,15 @@ class Wtty:
if readlen <= 0:
break
logger.info("startOff: %d endOff: %d readlen: %d", startOff, endOff, readlen)
if readlen > 4000:
readlen = 4000
endPoint = self.getCoord(startOff + readlen)
logger.info("endPoint {}".format(endPoint))
s = self.__consout.ReadConsoleOutputCharacter(readlen, startCo)
logger.info("len {}".format(len(s)))
self.lastRead += len(s)
self.totalRead += len(s)
buff.append(s)
logger.info(s.replace(screenbufferfillchar, '*'))
startCo = endPoint
return ''.join(buff)
@ -2210,7 +2223,6 @@ class Wtty:
position and inserts the string into self.__buffer."""
if not self.__consout:
logger.info('self.__consout is False')
return ""
consinfo = self.__consout.GetConsoleScreenBufferInfo()
@ -2241,7 +2253,7 @@ class Wtty:
raw = raw[self.__consSize[0]:]
raw = ''.join(rawlist)
s = self.parseData(raw)
logger.info(s)
logger.debug(s)
for i, line in enumerate(reversed(rawlist)):
if line.endswith(screenbufferfillchar):
# Record the Y offset where the most recent line break was detected