[FIX] flake8 linter first run; Atom first fixes

This commit is contained in:
Benedek Racz
2020-02-06 11:47:56 +01:00
parent 5c005a938b
commit 4bd5dea693
6 changed files with 542 additions and 502 deletions

View File

@@ -9,12 +9,12 @@ requires-python = >=3.4
project_urls =
Source Code = https://github.com/raczben/wexpect
license = MIT
classifier =
classifier =
Development Status :: 4 - Beta
Environment :: Console
Intended Audience :: Developers
Intended Audience :: Information Technology
Operating System :: Microsoft :: Windows
Operating System :: Microsoft :: Windows
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
@@ -30,3 +30,9 @@ test =
setuptools>=38.0
codecov
twine
[flake8]
max-line-length = 100
ignore =
# E402: Module level import not at top of the file
E402

View File

@@ -1,7 +1,7 @@
# __init__.py
import os
import pkg_resources
import pkg_resources
try:
spawn_class_name = os.environ['WEXPECT_SPAWN_CLASS']
@@ -22,10 +22,10 @@ if spawn_class_name == 'legacy_wexpect':
from .legacy_wexpect import searcher_re
__all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'run', 'split_command_line',
'__version__', 'ConsoleReader', 'join_args', 'searcher_string', 'searcher_re']
'__version__', 'ConsoleReader', 'join_args', 'searcher_string', 'searcher_re']
else:
from .wexpect_util import split_command_line
from .wexpect_util import join_args
from .wexpect_util import ExceptionPexpect
@@ -40,19 +40,19 @@ else:
from .host import run
from .host import searcher_string
from .host import searcher_re
try:
spawn = globals()[spawn_class_name]
except KeyError:
print(f'Error: no spawn class: {spawn_class_name}')
raise
# The version is handled by the package: pbr, which derives the version from the git tags.
try:
__version__ = pkg_resources.require("wexpect")[0].version
except: # pragma: no cover
except Exception: # pragma: no cover
__version__ = '0.0.1.unkowndev0'
__all__ = ['split_command_line', 'join_args', 'ExceptionPexpect', 'EOF', 'TIMEOUT',
'ConsoleReaderSocket', 'ConsoleReaderPipe', 'spawn', 'SpawnSocket', 'SpawnPipe', 'run',
'searcher_string', 'searcher_re', '__version__']
'ConsoleReaderSocket', 'ConsoleReaderPipe', 'spawn', 'SpawnSocket', 'SpawnPipe',
'run', 'searcher_string', 'searcher_re', '__version__']

View File

@@ -1,7 +1,7 @@
"""Wexpect is a Windows variant of pexpect https://pexpect.readthedocs.io.
Wexpect is a Python module for spawning child applications and controlling
them automatically.
them automatically.
console_reader Implements a virtual terminal, and starts the child program.
The main wexpect.Spawn class connect to this class to reach the child's terminal.
@@ -42,7 +42,6 @@ import logging
import os
import traceback
import psutil
import signal
from io import StringIO
import ctypes
@@ -59,38 +58,38 @@ from .wexpect_util import init_logger
from .wexpect_util import EOF_CHAR
from .wexpect_util import SIGNAL_CHARS
#
#
# System-wide constants
#
#
screenbufferfillchar = '\4'
maxconsoleY = 8000
default_port = 4321
#
# Create logger: We write logs only to file. Printing out logs are dangerous, because of the deep
# console manipulation.
#
logger = logging.getLogger('wexpect')
init_logger(logger)
class ConsoleReaderBase:
"""Consol class (aka. client-side python class) for the child.
This class initialize the console starts the child in it and reads the console periodically.
"""
def __init__(self, path, host_pid, codepage=None, window_size_x=80, window_size_y=25,
buffer_size_x=80, buffer_size_y=16000, local_echo=True, interact=False, **kwargs):
"""Initialize the console starts the child in it and reads the console periodically.
"""Initialize the console starts the child in it and reads the console periodically.
Args:
path (str): Child's executable with arguments.
parent_pid (int): Parent (aka. host) process process-ID
codepage (:obj:, optional): Output console code page.
"""
self.lastRead = 0
self.lastRead = 0
self.__bufferY = 0
self.lastReadData = ""
self.totalRead = 0
@@ -107,70 +106,70 @@ class ConsoleReaderBase:
self.child_process = None
self.child_pid = None
self.enable_signal_chars = True
logger.info("ConsoleReader started")
if codepage is None:
codepage = windll.kernel32.GetACP()
try:
logger.info("Setting console output code page to %s" % codepage)
win32console.SetConsoleOutputCP(codepage)
logger.info("Console output code page: %s" % ctypes.windll.kernel32.GetConsoleOutputCP())
logger.info(
"Console output code page: %s" % ctypes.windll.kernel32.GetConsoleOutputCP())
except Exception as e:
logger.info(e)
try:
self.create_connection(**kwargs)
logger.info('Spawning %s' % path)
try:
self.initConsole()
si = win32process.GetStartupInfo()
self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess(None, path, None, None, False,
0, None, None, si)
self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess(
None, path, None, None, False, 0, None, None, si)
self.child_process = psutil.Process(self.child_pid)
logger.info(f'Child pid: {self.child_pid} Console pid: {self.console_pid}')
except:
except Exception:
logger.info(traceback.format_exc())
return
if interact:
self.interact()
self.interact()
self.read_loop()
except:
except Exception:
logger.error(traceback.format_exc())
time.sleep(.1)
finally:
try:
self.terminate_child()
time.sleep(.01)
time.sleep(.01)
self.send_to_host(self.readConsoleToCursor())
self.sendeof()
time.sleep(.1)
self.close_connection()
logger.info('Console finished.')
except:
except Exception:
logger.error(traceback.format_exc())
time.sleep(.1)
def read_loop(self):
paused = False
while True:
if not self.isalive(self.host_process):
logger.info('Host process has been died.')
return
self.child_exitstatus = win32process.GetExitCodeProcess(self.__childProcess)
if self.child_exitstatus != win32con.STILL_ACTIVE:
logger.info(f'Child finished with code: {self.child_exitstatus}')
return
return
consinfo = self.consout.GetConsoleScreenBufferInfo()
cursorPos = consinfo['CursorPosition']
self.send_to_host(self.readConsoleToCursor())
@@ -180,24 +179,24 @@ class ConsoleReaderBase:
else:
logger.spam(f'get_from_host: {s}')
if self.enable_signal_chars:
for sig,char in SIGNAL_CHARS.items():
for sig, char in SIGNAL_CHARS.items():
if char in s:
self.child_process.send_signal(sig)
s = s.decode()
self.write(s)
if cursorPos.Y > maxconsoleY and not paused:
logger.info('cursorPos %s' % cursorPos)
self.suspendThread()
paused = True
if cursorPos.Y <= maxconsoleY and paused:
logger.info('cursorPos %s' % cursorPos)
self.resumeThread()
paused = False
time.sleep(.02)
def terminate_child(self):
try:
if self.child_process:
@@ -205,7 +204,7 @@ class ConsoleReaderBase:
except psutil.NoSuchProcess:
logger.info('The process has already died.')
return
def isalive(self, process):
"""True if the child is still alive, false otherwise"""
try:
@@ -213,10 +212,10 @@ class ConsoleReaderBase:
return False
except psutil.TimeoutExpired:
return True
def write(self, s):
"""Writes input into the child consoles input buffer."""
if len(s) == 0:
return 0
if s[-1] == '\n':
@@ -224,44 +223,45 @@ class ConsoleReaderBase:
records = [self.createKeyEvent(c) for c in str(s)]
if not self.consout:
return ""
# Store the current cursor position to hide characters in local echo disabled mode (workaround).
# Store the current cursor position to hide characters in local echo disabled mode
# (workaround).
consinfo = self.consout.GetConsoleScreenBufferInfo()
startCo = consinfo['CursorPosition']
# Send the string to console input
wrote = self.consin.WriteConsoleInput(records)
# Wait until all input has been recorded by the console.
ts = time.time()
while self.consin.PeekConsoleInput(8) != ():
if time.time() > ts + len(s) * .1 + .5:
break
time.sleep(.05)
# Hide characters in local echo disabled mode (workaround).
if not self.local_echo:
self.consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo)
return wrote
def createKeyEvent(self, char):
"""Creates a single key record corrosponding to
the ascii character char."""
evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
evt.KeyDown = True
evt.Char = char
evt.RepeatCount = 1
return evt
return evt
def initConsole(self, consout=None, window_size_x=80, window_size_y=25, buffer_size_x=80,
buffer_size_y=16000):
if not consout:
consout=self.getConsoleOut()
consout = self.getConsoleOut()
self.consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
rect = win32console.PySMALL_RECTType(0, 0, window_size_x-1, window_size_y-1)
consout.SetConsoleWindowInfo(True, rect)
size = win32console.PyCOORDType(buffer_size_x, buffer_size_y)
@@ -269,18 +269,18 @@ class ConsoleReaderBase:
pos = win32console.PyCOORDType(0, 0)
# Use NUL as fill char because it displays as whitespace
# (if we interact() with the child)
consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos)
consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos)
consinfo = consout.GetConsoleScreenBufferInfo()
self.__consSize = consinfo['Size']
logger.info('self.__consSize: ' + str(self.__consSize))
self.startCursorPos = consinfo['CursorPosition']
self.startCursorPos = consinfo['CursorPosition']
def parseData(self, s):
"""Ensures that special characters are interpretted as
newlines or blanks, depending on if there written over
characters or screen-buffer-fill characters."""
strlist = []
for i, c in enumerate(s):
if c == screenbufferfillchar:
@@ -291,44 +291,45 @@ class ConsoleReaderBase:
s = ''.join(strlist)
return s
def getConsoleOut(self):
consfile = win32file.CreateFile('CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
0)
consfile = win32file.CreateFile(
'CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
0)
self.consout = win32console.PyConsoleScreenBufferType(consfile)
return self.consout
def getCoord(self, offset):
"""Converts an offset to a point represented as a tuple."""
x = offset % self.__consSize.X
x = offset % self.__consSize.X
y = offset // self.__consSize.X
return win32console.PyCOORDType(x, y)
def getOffset(self, coord):
"""Converts a tuple-point to an offset."""
return coord.X + coord.Y * self.__consSize.X
def readConsole(self, startCo, endCo):
"""Reads the console area from startCo to endCo and returns it
as a string."""
if startCo is None:
startCo = self.startCursorPos
startCo = self.startCursorPos
startCo.Y = startCo.Y
if endCo is None:
consinfo = self.consout.GetConsoleScreenBufferInfo()
endCo = consinfo['CursorPosition']
endCo= self.getCoord(0 + self.getOffset(endCo))
endCo = self.getCoord(0 + self.getOffset(endCo))
buff = []
self.lastRead = 0
@@ -336,10 +337,10 @@ class ConsoleReaderBase:
startOff = self.getOffset(startCo)
endOff = self.getOffset(endCo)
readlen = endOff - startOff
if readlen <= 0:
break
if readlen > 4000:
readlen = 4000
endPoint = self.getCoord(startOff + readlen)
@@ -352,34 +353,34 @@ class ConsoleReaderBase:
startCo = endPoint
return ''.join(buff)
def readConsoleToCursor(self):
"""Reads from the current read position to the current cursor
position and inserts the string into self.__buffer."""
if not self.consout:
return ""
consinfo = self.consout.GetConsoleScreenBufferInfo()
cursorPos = consinfo['CursorPosition']
logger.spam('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
isSameX = cursorPos.X == self.__currentReadCo.X
isSameY = cursorPos.Y == self.__currentReadCo.Y
isSamePos = isSameX and isSameY
logger.spam('isSameY: %r' % isSameY)
logger.spam('isSamePos: %r' % isSamePos)
if isSameY or not self.lastReadData.endswith('\r\n'):
# Read the current slice again
self.totalRead -= self.lastRead
self.__currentReadCo.X = 0
self.__currentReadCo.Y = self.__bufferY
logger.spam('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
raw = self.readConsole(self.__currentReadCo, cursorPos)
rawlist = []
while raw:
@@ -392,17 +393,17 @@ class ConsoleReaderBase:
# Record the Y offset where the most recent line break was detected
self.__bufferY += len(rawlist) - i
break
logger.spam('lastReadData: %r' % self.lastReadData)
if s:
logger.debug('Read: %r' % s)
else:
logger.spam('Read: %r' % s)
if isSamePos and self.lastReadData == s:
logger.spam('isSamePos and self.lastReadData == s')
s = ''
if s:
lastReadData = self.lastReadData
pos = self.getOffset(self.__currentReadCo)
@@ -430,46 +431,46 @@ class ConsoleReaderBase:
self.__currentReadCo.Y = cursorPos.Y
return s
def interact(self):
"""Displays the child console for interaction."""
logger.debug('Start interact window')
win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW)
def sendeof(self):
"""This sends an EOF to the host. This sends a character which inform the host that child
has been finished, and all of it's output has been send to host.
"""
self.send_to_host(EOF_CHAR)
class ConsoleReaderSocket(ConsoleReaderBase):
class ConsoleReaderSocket(ConsoleReaderBase):
def create_connection(self, **kwargs):
try:
try:
self.port = kwargs['port']
# Create a TCP/IP socket
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = ('localhost', self.port)
self.sock.bind(server_address)
logger.info(f'Socket started at port: {self.port}')
# Listen for incoming connections
self.sock.settimeout(5)
self.sock.listen(1)
self.connection, client_address = self.sock.accept()
self.connection.settimeout(.01)
logger.info(f'Client connected: {client_address}')
except:
except Exception:
logger.error(f"Port: {self.port}")
raise
def close_connection(self):
if self.connection:
self.connection.close()
def send_to_host(self, msg):
# convert to bytes
if isinstance(msg, str):
@@ -479,7 +480,7 @@ class ConsoleReaderSocket(ConsoleReaderBase):
else:
logger.spam(f'Sending msg: {msg}')
self.connection.sendall(msg)
def get_from_host(self):
try:
msg = self.connection.recv(4096)
@@ -498,8 +499,8 @@ class ConsoleReaderSocket(ConsoleReaderBase):
else:
# got a message do something :)
return msg
class ConsoleReaderPipe(ConsoleReaderBase):
def create_connection(self, **kwargs):
pipe_name = 'wexpect_{}'.format(self.console_pid)
@@ -513,11 +514,11 @@ class ConsoleReaderPipe(ConsoleReaderBase):
logger.info("waiting for client")
win32pipe.ConnectNamedPipe(self.pipe, None)
logger.info('got client')
def close_connection(self):
if self.pipe:
win32file.CloseHandle(self.pipe)
def send_to_host(self, msg):
# convert to bytes
if isinstance(msg, str):
@@ -527,7 +528,7 @@ class ConsoleReaderPipe(ConsoleReaderBase):
else:
logger.spam(f'Sending msg: {msg}')
win32file.WriteFile(self.pipe, msg)
def get_from_host(self):
data, avail, bytes_left = win32pipe.PeekNamedPipe(self.pipe, 4096)
logger.spam(f'data: {data} avail:{avail} bytes_left{bytes_left}')

View File

@@ -7,7 +7,7 @@ scripts for duplicating software package installations on different servers. It
can be used for automated software testing. Wexpect is in the spirit of Don
Libes' Expect, but Wexpect is pure Python. Other Expect-like modules for Python
require TCL and Expect or require C extensions to be compiled. Wexpect does not
use C, Expect, or TCL extensions.
use C, Expect, or TCL extensions.
There are two main interfaces to Wexpect -- the function, run() and the class,
spawn. You can call the run() function to execute a command and return the
@@ -92,12 +92,12 @@ from .wexpect_util import EOF_CHAR
from .wexpect_util import SIGNAL_CHARS
logger = logging.getLogger('wexpect')
init_logger(logger)
def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None, logfile=None,
cwd=None, env=None, **kwargs):
def run(command, timeout=-1, withexitstatus=False, events=None, extra_args=None, logfile=None,
cwd=None, env=None, **kwargs):
"""
This function runs the given command; waits for it to finish; then
returns all output as a string. STDERR is included in output. If the full
@@ -162,21 +162,22 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None
if timeout == -1:
child = spawn(command, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs)
else:
child = spawn(command, timeout=timeout, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs)
child = spawn(
command, timeout=timeout, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs)
if events is not None:
patterns = list(events.keys())
responses = list(events.values())
else:
patterns=None # We assume that EOF or TIMEOUT will save us.
responses=None
patterns = None # We assume that EOF or TIMEOUT will save us.
responses = None
child_result_list = []
event_count = 0
while 1:
try:
index = child.expect (patterns)
index = child.expect(patterns)
if type(child.after) in (str,):
child_result_list.append(child.before + child.after)
else: # child.after may have been a TIMEOUT or EOF, so don't cat those.
else: # child.after may have been a TIMEOUT or EOF, so don't cat those.
child_result_list.append(child.before)
if type(responses[index]) in (str,):
child.send(responses[index])
@@ -189,7 +190,7 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None
break
else:
logger.warning("TypeError ('The callback must be a string or function type.')")
raise TypeError ('The callback must be a string or function type.')
raise TypeError('The callback must be a string or function type.')
event_count = event_count + 1
except TIMEOUT:
child_result_list.append(child.before)
@@ -204,15 +205,16 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None
else:
return child_result
class SpawnBase:
def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None,
logfile=None, cwd=None, env=None, codepage=None, echo=True, safe_exit=True, interact=False, **kwargs):
logfile=None, cwd=None, env=None, codepage=None, echo=True, safe_exit=True,
interact=False, **kwargs):
"""This starts the given command in a child process. This does all the
fork/exec type of stuff for a pty. This is called by __init__. If args
is empty then command will be parsed (split on spaces) and args will be
set to parsed arguments.
set to parsed arguments.
The pid and child_fd of this object get set by this method.
Note that it is difficult for this method to fail.
You cannot detect if the child process cannot start.
@@ -222,12 +224,12 @@ class SpawnBase:
That may not necessarily be bad because you may haved spawned a child
that performs some task; creates no stdout output; and then dies.
"""
self.host_pid = os.getpid() # That's me
self.host_pid = os.getpid() # That's me
self.console_process = None
self.console_pid = None
self.child_process = None
self.child_pid = None
self.safe_exit = safe_exit
self.searcher = None
self.ignorecase = False
@@ -237,23 +239,26 @@ class SpawnBase:
self.match_index = None
self.terminated = True
self.exitstatus = None
self.status = None # status returned by os.waitpid
self.status = None # status returned by os.waitpid
self.flag_eof = False
self.flag_child_finished = False
self.child_fd = -1 # initially closed
self.child_fd = -1 # initially closed
self.timeout = timeout
self.delimiter = EOF
self.codepage = codepage
self.cwd = cwd
self.env = env
self.echo = echo
self.maxread = maxread # max bytes to read at one time into buffer
self.delaybeforesend = 0.1 # Sets sleep time used just before sending data to child. Time in seconds.
self.delayafterterminate = 0.1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds.
self.buffer = '' # This is the read buffer. See maxread.
self.searchwindowsize = searchwindowsize # Anything before searchwindowsize point is preserved, but not searched.
self.maxread = maxread # max bytes to read at one time into buffer
# delaybeforesend: Sets sleep time used just before sending data to child. Time in seconds.
self.delaybeforesend = 0.1
# delayafterterminate: Sets delay in terminate() method to allow kernel time to update
# process status. Time in seconds.
self.delayafterterminate = 0.1
self.buffer = '' # This is the read buffer. See maxread.
# searchwindowsize: Anything before searchwindowsize point is preserved, but not searched.
self.searchwindowsize = searchwindowsize
self.interact_state = interact
# If command is an int type then it may represent a file descriptor.
if type(command) == type(0):
@@ -263,36 +268,37 @@ class SpawnBase:
if type (args) != type([]):
logger.warning("TypeError ('The argument, args, must be a list.')")
raise TypeError ('The argument, args, must be a list.')
if args == []:
self.args = split_command_line(command)
self.command = self.args[0]
else:
self.args = args[:] # work with a copy
self.args.insert (0, command)
self.command = command
self.args = args[:] # work with a copy
self.args.insert(0, command)
self.command = command
command_with_path = shutil.which(self.command)
if command_with_path is None:
logger.warning('The command was not found or was not executable: %s.' % self.command)
raise ExceptionPexpect ('The command was not found or was not executable: %s.' % self.command)
raise ExceptionPexpect(
'The command was not found or was not executable: %s.' % self.command)
self.command = command_with_path
self.args[0] = self.command
self.name = '<' + ' '.join (self.args) + '>'
self.name = '<' + ' '.join(self.args) + '>'
self.terminated = False
self.closed = False
self.child_fd = self.startChild(self.args, self.env)
self.get_child_process()
logger.info(f'Child pid: {self.child_pid} Console pid: {self.console_pid}')
self.connect_to_child()
def __del__(self):
"""This makes sure that no system resources are left open. Python only
garbage collects Python objects, not the child console."""
try:
logger.info('Deleting...')
if self.child_process is not None:
@@ -300,10 +306,10 @@ class SpawnBase:
self.disconnect_from_child()
if self.safe_exit:
self.wait()
except:
except Exception:
traceback.print_exc()
logger.warning(traceback.format_exc())
def __str__(self):
"""This returns a human-readable string that represents the state of
the object. """
@@ -336,9 +342,9 @@ class SpawnBase:
si = win32process.GetStartupInfo()
si.dwFlags = win32process.STARTF_USESHOWWINDOW
si.wShowWindow = win32con.SW_HIDE
dirname = os.path.dirname(sys.executable
if getattr(sys, 'frozen', False) else
dirname = os.path.dirname(sys.executable
if getattr(sys, 'frozen', False) else
os.path.abspath(__file__))
spath = [os.path.dirname(dirname)]
pyargs = ['-c']
@@ -346,33 +352,35 @@ class SpawnBase:
# If we are running 'frozen', add library.zip and lib\library.zip to sys.path
# py2exe: Needs appropriate 'zipfile' option in setup script and 'bundle_files' 3
spath.append(os.path.join(dirname, 'library.zip'))
spath.append(os.path.join(dirname, 'library.zip',
spath.append(os.path.join(dirname, 'library.zip',
os.path.basename(os.path.splitext(sys.executable)[0])))
if os.path.isdir(os.path.join(dirname, 'lib')):
dirname = os.path.join(dirname, 'lib')
spath.append(os.path.join(dirname, 'library.zip'))
spath.append(os.path.join(dirname, 'library.zip',
spath.append(os.path.join(dirname, 'library.zip',
os.path.basename(os.path.splitext(sys.executable)[0])))
pyargs.insert(0, '-S') # skip 'import site'
if getattr(sys, 'frozen', False):
python_executable = os.path.join(dirname, 'python.exe')
python_executable = os.path.join(dirname, 'python.exe')
else:
python_executable = os.path.join(os.path.dirname(sys.executable), 'python.exe')
self.console_class_parameters.update({
self.console_class_parameters.update(
{
'host_pid': self.host_pid,
'local_echo': self.echo,
'interact': self.interact_state,
'codepage': self.codepage
})
console_class_parameters_kv_pairs = [f'{k}={v}' for k,v in self.console_class_parameters.items() ]
}
)
console_class_parameters_kv_pairs = [f'{k}={v}' for k, v in self.console_class_parameters.items()]
console_class_parameters_str = ', '.join(console_class_parameters_kv_pairs)
child_class_initializator = f"cons = wexpect.{self.console_class_name}(wexpect.join_args({args}), {console_class_parameters_str});"
commandLine = '"%s" %s "%s"' % (python_executable,
' '.join(pyargs),
commandLine = '"%s" %s "%s"' % (python_executable,
' '.join(pyargs),
"import sys;"
f"sys.path = {spath} + sys.path;"
"import wexpect;"
@@ -382,64 +390,64 @@ class SpawnBase:
"wexpect.console_reader.logger.info(f'Console finished2. {cons.child_exitstatus}');"
"sys.exit(cons.child_exitstatus)"
)
logger.info(f'Console starter command:{commandLine}')
_, _, self.console_pid, __otid = win32process.CreateProcess(None, commandLine, None, None, False,
win32process.CREATE_NEW_CONSOLE, None, self.cwd, si)
_, _, self.console_pid, __otid = win32process.CreateProcess(
None, commandLine, None, None, False, win32process.CREATE_NEW_CONSOLE, None, self.cwd, si)
def get_console_process(self, force=False):
if force or self.console_process is None:
self.console_process = psutil.Process(self.console_pid)
return self.console_process
def get_child_process(self, force=False):
if force or self.console_process is None:
self.child_process = self.get_console_process()
self.child_pid = self.child_process.pid
return self.child_process
def close(self): # File-like object.
""" Closes the child console."""
self.closed = self.terminate()
def terminate(self, force=False):
"""Terminate the child. Force not used. """
if not self.isalive():
return True
self.kill()
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
return False
def isalive(self, trust_console=True):
"""True if the child is still alive, false otherwise"""
if trust_console:
if self.flag_eof:
return False
if self.child_process is None:
# Child process has not been started... Not alive
return False
try:
self.exitstatus = self.child_process.wait(timeout=0)
logger.info(f'exitstatus: {self.exitstatus}')
except psutil.TimeoutExpired:
return True
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
self.child_process.send_signal(sig)
except psutil.NoSuchProcess as e:
logger.info('Child has already died. %s', e)
def wait(self, child=True, console=False):
if child:
self.exitstatus = self.child_process.wait()
@@ -448,8 +456,8 @@ class SpawnBase:
self.exitstatus = self.console_process.wait()
logger.info(f'exitstatus: {self.exitstatus}')
return self.exitstatus
def read (self, size = -1): # File-like object.
def read(self, size=-1): # File-like object.
"""This reads at most "size" bytes from the file (less if the read hits
EOF before obtaining size bytes). If the size argument is negative or
omitted, read all data until EOF is reached. The bytes are returned as
@@ -459,7 +467,7 @@ class SpawnBase:
if size == 0:
return ''
if size < 0:
self.expect (self.delimiter) # delimiter default is EOF
self.expect(self.delimiter) # delimiter default is EOF
return self.before
# I could have done this more directly by not using expect(), but
@@ -470,12 +478,12 @@ class SpawnBase:
# Note, it's OK if size==-1 in the regex. That just means it
# will never match anything in which case we stop only on EOF.
cre = re.compile('.{%d}' % size, re.DOTALL)
index = self.expect ([cre, self.delimiter]) # delimiter default is EOF
index = self.expect([cre, self.delimiter]) # delimiter default is EOF
if index == 0:
return self.after ### self.before should be ''. Should I assert this?
return self.after # self.before should be ''. Should I assert this?
return self.before
def readline (self, size = -1): # File-like object.
def readline(self, size=-1): # File-like object.
"""This reads and returns one entire line. A trailing newline is kept
in the string, but may be absent when a file ends with an incomplete
line. Note: This readline() looks for a \\r\\n pair even on UNIX
@@ -487,24 +495,24 @@ class SpawnBase:
if size == 0:
return ''
index = self.expect (['\r\n', self.delimiter]) # delimiter default is EOF
index = self.expect(['\r\n', self.delimiter]) # delimiter default is EOF
if index == 0:
return self.before + '\r\n'
else:
return self.before
def __iter__ (self): # File-like object.
def __iter__(self): # File-like object.
"""This is to support iterators over a file-like object.
"""
return self
def read_nonblocking (self, size = 1):
def read_nonblocking(self, size=1):
"""Virtual definition
"""
raise NotImplementedError
def __next__ (self): # File-like object.
def __next__(self): # File-like object.
"""This is to support iterators over a file-like object.
"""
@@ -519,7 +527,7 @@ class SpawnBase:
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
def readlines (self, sizehint = -1): # File-like object.
def readlines(self, sizehint=-1): # File-like object.
"""This reads until EOF using readline() and returns a list containing
the lines thus read. The optional "sizehint" argument is ignored. """
@@ -533,17 +541,16 @@ class SpawnBase:
def isatty(self): # File-like object.
"""The child is always created with a console."""
return True
def write(self, s): # File-like object.
"""This is similar to send() except that there is no return value.
"""
self.send(s)
def writelines (self, sequence): # File-like object.
def writelines(self, sequence): # File-like object.
"""This calls write() for each element in the sequence. The sequence
can be any iterable object producing strings, typically a list of
strings. This does not add line separators There is no return value.
@@ -553,11 +560,10 @@ class SpawnBase:
self.write(s)
def sendline(self, s=''):
"""This is like send(), but it adds a line feed (os.linesep). This
returns the number of bytes written. """
n = self.send(s+'\r\n')
n = self.send(s + '\r\n')
return n
def sendeof(self):
@@ -573,33 +579,33 @@ class SpawnBase:
# platform does not define VEOF so assume CTRL-D
char = chr(4)
self.send(char)
def send(self, s, delaybeforesend=None):
"""Virtual definition
"""
if delaybeforesend is None:
delaybeforesend = self.delaybeforesend
if delaybeforesend:
time.sleep(delaybeforesend)
return self._send_impl(s)
def _send_impl(self, s):
"""Virtual definition
"""
raise NotImplementedError
def connect_to_child(self):
"""Virtual definition
"""
raise NotImplementedError
def disconnect_from_child(self):
"""Virtual definition
"""
raise NotImplementedError
def compile_pattern_list(self, patterns):
"""This compiles a pattern-string or a list of pattern-strings.
Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of
@@ -629,7 +635,7 @@ class SpawnBase:
if type(patterns) is not list:
patterns = [patterns]
compile_flags = re.DOTALL # Allow dot to match \n
compile_flags = re.DOTALL # Allow dot to match \n
if self.ignorecase:
compile_flags = compile_flags | re.IGNORECASE
compiled_pattern_list = []
@@ -643,12 +649,16 @@ class SpawnBase:
elif type(p) is type(re.compile('')):
compiled_pattern_list.append(p)
else:
logger.warning("TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of those type. %s' % str(type(p)))")
raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of those type. %s' % str(type(p)))
logger.warning(
"TypeError: 'Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or"
" a list of those type. %s' % str(type(p))")
raise TypeError(
'Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of'
' those type. %s' % str(type(p)))
return compiled_pattern_list
def expect(self, pattern, timeout = -1, searchwindowsize=None):
def expect(self, pattern, timeout=-1, searchwindowsize=None):
"""This seeks through the stream until a pattern is matched. The
pattern is overloaded and may take several types. The pattern can be a
StringType, EOF, a compiled re, or a list of any of those types.
@@ -727,7 +737,7 @@ class SpawnBase:
compiled_pattern_list = self.compile_pattern_list(pattern)
return self.expect_list(compiled_pattern_list, timeout, searchwindowsize)
def expect_list(self, pattern_list, timeout = -1, searchwindowsize = -1):
def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1):
"""This takes a list of compiled regular expressions and returns the
index into the pattern_list that matched the child output. The list may
also contain EOF or TIMEOUT (which are not compiled regular
@@ -740,7 +750,7 @@ class SpawnBase:
return self.expect_loop(searcher_re(pattern_list), timeout, searchwindowsize)
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1):
def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1):
"""This is similar to expect(), but uses plain string matching instead
of compiled regular expressions in 'pattern_list'. The 'pattern_list'
may be a string; a list or other sequence of strings; or TIMEOUT and
@@ -753,17 +763,21 @@ class SpawnBase:
This method is also useful when you don't want to have to worry about
escaping regular expression characters that you want to match."""
if not isinstance(pattern_list, list):
if not isinstance(pattern_list, list):
pattern_list = [pattern_list]
for p in pattern_list:
if type(p) not in (str,) and p not in (TIMEOUT, EOF):
logger.warning('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p)))
raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p)))
logger.warning(
'TypeError: Argument must be one of StringTypes, EOF, TIMEOUT, or a list of'
' those type. %s' % str(type(p)))
raise TypeError(
'Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. '
'%s' % str(type(p)))
return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def expect_loop(self, searcher, timeout = -1, searchwindowsize = -1):
def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1):
"""This is the common loop used inside expect. The 'searcher' should be
an instance of searcher_re or searcher_string, which describes how and what
to search for in the input.
@@ -775,33 +789,33 @@ class SpawnBase:
if timeout == -1:
timeout = self.timeout
if timeout is not None:
end_time = time.time() + timeout
end_time = time.time() + timeout
if searchwindowsize == -1:
searchwindowsize = self.searchwindowsize
logger.debug(f'searcher: {searcher}')
try:
incoming = self.buffer
freshlen = len(incoming)
while True: # Keep reading until exception or return.
while True: # Keep reading until exception or return.
index = searcher.search(incoming, freshlen, searchwindowsize)
if index >= 0:
self.buffer = incoming[searcher.end : ]
self.before = incoming[ : searcher.start]
self.after = incoming[searcher.start : searcher.end]
self.buffer = incoming[searcher.end:]
self.before = incoming[:searcher.start]
self.after = incoming[searcher.start:searcher.end]
self.match = searcher.match
self.match_index = index
return self.match_index
# No match at this point
if timeout is not None and end_time < time.time():
logger.info('Timeout exceeded in expect_any().')
raise TIMEOUT ('Timeout exceeded in expect_any().')
# Still have time left, so read more data
raise TIMEOUT('Timeout exceeded in expect_any().')
# Still have time left, so read more data
self.isalive()
c = self.read_nonblocking(self.maxread)
freshlen = len(c)
time.sleep (0.01)
time.sleep(0.01)
incoming += c
except EOF as e:
self.buffer = ''
@@ -831,26 +845,32 @@ class SpawnBase:
self.match_index = None
logger.info(f'TIMEOUT: {e}\n{self}')
raise TIMEOUT(f'{e}\n{self}')
except:
except Exception:
self.before = incoming
self.after = None
self.match = None
self.match_index = None
raise
class SpawnPipe(SpawnBase):
def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None,
logfile=None, cwd=None, env=None, codepage=None, echo=True, interact=False, **kwargs):
logfile=None, cwd=None, env=None, codepage=None, echo=True, interact=False,
**kwargs):
self.pipe = None
self.console_class_name = 'ConsoleReaderPipe'
self.console_class_parameters = {}
super().__init__(command=command, args=args, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, interact=interact)
self.delayafterterminate = 1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds.
super().__init__(
command=command, args=args, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo,
interact=interact)
# Sets delay in terminate() method to allow kernel time to update process status. Time in
# seconds.
self.delayafterterminate = 1
def connect_to_child(self):
pipe_name = 'wexpect_{}'.format(self.console_pid)
pipe_full_path = r'\\.\pipe\{}'.format(pipe_name)
@@ -867,22 +887,23 @@ class SpawnPipe(SpawnBase):
None
)
logger.debug('Pipe found')
res = win32pipe.SetNamedPipeHandleState(self.pipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)
res = win32pipe.SetNamedPipeHandleState(self.pipe, win32pipe.PIPE_READMODE_MESSAGE,
None, None)
if res == 0:
logger.debug(f"SetNamedPipeHandleState return code: {res}")
return
except pywintypes.error as e:
if e.args[0] == winerror.ERROR_FILE_NOT_FOUND: #2
if e.args[0] == winerror.ERROR_FILE_NOT_FOUND: # 2
logger.debug("no pipe, trying again in a bit later")
time.sleep(0.2)
else:
raise
def disconnect_from_child(self):
if self.pipe:
win32file.CloseHandle(self.pipe)
def read_nonblocking (self, size = 1):
def read_nonblocking(self, size=1):
"""This reads at most size characters from the child application. If
the end of file is read then an EOF exception will be raised.
@@ -895,24 +916,24 @@ class SpawnPipe(SpawnBase):
if self.closed:
logger.warning('I/O operation on closed file in read_nonblocking().')
raise ValueError ('I/O operation on closed file in read_nonblocking().')
raise ValueError('I/O operation on closed file in read_nonblocking().')
try:
s = win32file.ReadFile(self.pipe, size)[1]
if s:
logger.debug(f'Readed: {s}')
else:
logger.spam(f'Readed: {s}')
if b'\x04' in s:
self.flag_eof = True
logger.info("EOF: EOF character has been arrived")
raise EOF('EOF character has been arrived')
return s.decode()
except pywintypes.error as e:
if e.args[0] == winerror.ERROR_BROKEN_PIPE: #109
if e.args[0] == winerror.ERROR_BROKEN_PIPE: # 109
self.flag_eof = True
logger.info("EOF('broken pipe, bye bye')")
raise EOF('broken pipe, bye bye')
@@ -924,7 +945,7 @@ class SpawnPipe(SpawnBase):
raise EOF('The pipe is being closed.')
else:
raise
def _send_impl(self, s):
"""This sends a string to the child process. This returns the number of
bytes written. If a log file was set then the data is also written to
@@ -937,7 +958,7 @@ class SpawnPipe(SpawnBase):
win32file.WriteFile(self.pipe, s)
logger.spam(f"WriteFile finished.")
except pywintypes.error as e:
if e.args[0] == winerror.ERROR_BROKEN_PIPE: #109
if e.args[0] == winerror.ERROR_BROKEN_PIPE: # 109
logger.info("EOF: broken pipe, bye bye")
raise EOF("broken pipe, bye bye")
elif e.args[0] == winerror.ERROR_NO_DATA:
@@ -947,9 +968,9 @@ class SpawnPipe(SpawnBase):
logger.info("The pipe is being closed.")
raise EOF("The pipe is being closed.")
else:
raise
raise
return len(s)
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
@@ -961,20 +982,25 @@ class SpawnPipe(SpawnBase):
class SpawnSocket(SpawnBase):
def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None,
logfile=None, cwd=None, env=None, codepage=None, echo=True, port=4321, host='127.0.0.1', interact=False):
logfile=None, cwd=None, env=None, codepage=None, echo=True, port=4321,
host='127.0.0.1', interact=False):
self.port = port
self.host = host
self.sock = None
self.console_class_name = 'ConsoleReaderSocket'
self.console_class_parameters = {'port': port}
super().__init__(command=command, args=args, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, interact=interact)
self.delayafterterminate = 1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds.
super().__init__(
command=command, args=args, timeout=timeout, maxread=maxread,
searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo,
interact=interact)
# Sets delay in terminate() method to allow kernel time to update process status. Time in
# seconds.
self.delayafterterminate = 1
def _send_impl(self, s):
"""This sends a string to the child process. This returns the number of
bytes written. If a log file was set then the data is also written to
@@ -983,18 +1009,18 @@ class SpawnSocket(SpawnBase):
s = str.encode(s)
self.sock.sendall(s)
return len(s)
def connect_to_child(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self.sock.settimeout(.2)
def disconnect_from_child(self):
if self.sock:
self.sock.close()
self.sock = None
def read_nonblocking (self, size = 1):
def read_nonblocking(self, size=1):
"""This reads at most size characters from the child application. If
the end of file is read then an EOF exception will be raised.
@@ -1007,22 +1033,21 @@ class SpawnSocket(SpawnBase):
if self.closed:
logger.info('I/O operation on closed file in read_nonblocking().')
raise ValueError ('I/O operation on closed file in read_nonblocking().')
raise ValueError('I/O operation on closed file in read_nonblocking().')
try:
s = self.sock.recv(size)
if s:
logger.debug(f'Readed: {s}')
else:
logger.spam(f'Readed: {s}')
if EOF_CHAR in s:
self.flag_eof = True
logger.info("EOF: EOF character has been arrived")
raise EOF('EOF character has been arrived')
except ConnectionResetError:
self.flag_eof = True
logger.info("EOF('ConnectionResetError')")
@@ -1031,7 +1056,7 @@ class SpawnSocket(SpawnBase):
return ''
return s.decode()
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
try:
@@ -1039,7 +1064,7 @@ class SpawnSocket(SpawnBase):
self.send(SIGNAL_CHARS[sig])
except EOF as e:
logger.info(e)
class searcher_re (object):
"""This is regular expression string search helper for the
@@ -1080,12 +1105,12 @@ class searcher_re (object):
"""This returns a human-readable string that represents the state of
the object."""
ss = [ (n,' %d: re.compile("%s")' % (n,str(s.pattern))) for n,s in self._searches]
ss.append((-1,'searcher_re:'))
ss = [(n, ' %d: re.compile("%s")' % (n, str(s.pattern))) for n, s in self._searches]
ss.append((-1, 'searcher_re:'))
if self.eof_index >= 0:
ss.append ((self.eof_index,' %d: EOF' % self.eof_index))
ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
if self.timeout_index >= 0:
ss.append ((self.timeout_index,' %d: TIMEOUT' % self.timeout_index))
ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index))
ss.sort()
ss = list(zip(*ss))[1]
return '\n'.join(ss)
@@ -1096,7 +1121,7 @@ class searcher_re (object):
'buffer' which have not been searched before.
See class spawn for the 'searchwindowsize' argument.
If there is a match this returns the index of that string, and sets
'start', 'end' and 'match'. Otherwise, returns -1."""
@@ -1107,7 +1132,7 @@ class searcher_re (object):
if searchwindowsize is None:
searchstart = 0
else:
searchstart = max(0, len(buffer)-searchwindowsize)
searchstart = max(0, len(buffer) - searchwindowsize)
for index, s in self._searches:
match = s.search(buffer, searchstart)
if match is None:
@@ -1123,8 +1148,8 @@ class searcher_re (object):
self.match = the_match
self.end = self.match.end()
return best_index
class searcher_string (object):
"""This is a plain string search helper for the spawn.expect_any() method.
@@ -1161,12 +1186,12 @@ class searcher_string (object):
"""This returns a human-readable string that represents the state of
the object."""
ss = [ (ns[0],' %d: "%s"' % ns) for ns in self._strings ]
ss.append((-1,'searcher_string:'))
ss = [(ns[0], ' %d: "%s"' % ns) for ns in self._strings]
ss.append((-1, 'searcher_string:'))
if self.eof_index >= 0:
ss.append ((self.eof_index,' %d: EOF' % self.eof_index))
ss.append((self.eof_index, ' %d: EOF' % self.eof_index))
if self.timeout_index >= 0:
ss.append ((self.timeout_index,' %d: TIMEOUT' % self.timeout_index))
ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index))
ss.sort()
ss = list(zip(*ss))[1]
return '\n'.join(ss)
@@ -1196,12 +1221,12 @@ class searcher_string (object):
# rescanning until we've read three more bytes.
#
# Sadly, I don't know enough about this interesting topic. /grahn
for index, s in self._strings:
if searchwindowsize is None:
# the match, if any, can only be in the fresh data,
# or at the very end of the old data
offset = -(freshlen+len(s))
offset = -(freshlen + len(s))
else:
# better obey searchwindowsize
offset = -searchwindowsize

View File

@@ -7,7 +7,7 @@ scripts for duplicating software package installations on different servers. It
can be used for automated software testing. Wexpect is in the spirit of Don
Libes' Expect, but Wexpect is pure Python. Other Expect-like modules for Python
require TCL and Expect or require C extensions to be compiled. Wexpect does not
use C, Expect, or TCL extensions.
use C, Expect, or TCL extensions.
There are two main interfaces to Wexpect -- the function, run() and the class,
spawn. You can call the run() function to execute a command and return the
@@ -80,7 +80,7 @@ import shutil
import types
import traceback
import signal
import pkg_resources
import pkg_resources
from io import StringIO
try:
@@ -96,9 +96,9 @@ try:
except ImportError as e: # pragma: no cover
raise ImportError(str(e) + "\nThis package requires the win32 python packages.\r\nInstall with pip install pywin32")
#
#
# System-wide constants
#
#
screenbufferfillchar = '\4'
maxconsoleY = 8000
@@ -368,9 +368,9 @@ def spawn(command, args=[], timeout=30, maxread=2000, searchwindowsize=None, log
logger.debug('\t%s=%s' % (name, env[name]))
if cwd:
logger.debug('Working directory: %s' % cwd)
return spawn_windows(command, args, timeout, maxread, searchwindowsize, logfile, cwd, env,
codepage, echo=echo)
codepage, echo=echo)
class spawn_windows ():
"""This is the main class interface for Wexpect. Use this class to start
@@ -381,7 +381,7 @@ class spawn_windows ():
""" The spawn_windows constructor. Do not call it directly. Use spawn(), or run() instead.
"""
self.codepage = codepage
self.stdin = sys.stdin
self.stdout = sys.stdout
self.stderr = sys.stderr
@@ -414,7 +414,7 @@ class spawn_windows ():
self.ocwd = os.getcwd()
self.cwd = cwd
self.env = env
# allow dummy instances for subclasses that may not use command or args.
if command is None:
self.command = None
@@ -426,12 +426,12 @@ class spawn_windows ():
def __del__(self):
"""This makes sure that no system resources are left open. Python only
garbage collects Python objects, not the child console."""
try:
self.wtty.terminate_child()
except:
pass
def __str__(self):
"""This returns a human-readable string that represents the state of
@@ -463,7 +463,7 @@ class spawn_windows ():
s.append('delaybeforesend: ' + str(self.delaybeforesend))
s.append('delayafterterminate: ' + str(self.delayafterterminate))
return '\n'.join(s)
def _spawn(self,command,args=[], echo=True):
"""This starts the given command in a child process. This does all the
fork/exec type of stuff for a pty. This is called by __init__. If args
@@ -487,15 +487,15 @@ class spawn_windows ():
if type (args) != type([]):
logger.info('TypeError: The argument, args, must be a list.')
raise TypeError ('The argument, args, must be a list.')
if args == []:
self.args = split_command_line(command)
self.command = self.args[0]
else:
self.args = args[:] # work with a copy
self.args.insert (0, command)
self.command = command
self.command = command
command_with_path = shutil.which(self.command)
if command_with_path is None:
logger.info('ExceptionPexpect: The command was not found or was not executable: %s.' % self.command)
@@ -508,30 +508,30 @@ class spawn_windows ():
#assert self.pid is None, 'The pid member should be None.'
#assert self.command is not None, 'The command member should not be None.'
self.wtty = Wtty(codepage=self.codepage, echo=echo)
self.wtty = Wtty(codepage=self.codepage, echo=echo)
if self.cwd is not None:
os.chdir(self.cwd)
self.child_fd = self.wtty.spawn(self.command, self.args, self.env)
if self.cwd is not None:
# Restore the original working dir
os.chdir(self.ocwd)
self.terminated = False
self.closed = False
self.pid = self.wtty.pid
def fileno (self): # File-like object.
"""There is no child fd."""
return 0
def close(self, force=True): # File-like object.
""" Closes the child console."""
self.closed = self.terminate(force)
if not self.closed:
logger.info('ExceptionPexpect: close() could not terminate the child using terminate()')
@@ -540,7 +540,7 @@ class spawn_windows ():
def isatty(self): # File-like object.
"""The child is always created with a console."""
return True
def waitnoecho (self, timeout=-1):
@@ -564,7 +564,7 @@ class spawn_windows ():
if timeout == -1:
timeout = self.timeout
if timeout is not None:
end_time = time.time() + timeout
end_time = time.time() + timeout
while True:
if not self.getecho():
return True
@@ -583,9 +583,9 @@ class spawn_windows ():
def setecho (self, state):
"""This sets the terminal echo mode on or off."""
self.wtty.setecho(state)
def read (self, size = -1): # File-like object.
"""This reads at most "size" bytes from the file (less if the read hits
@@ -682,7 +682,7 @@ class spawn_windows ():
if self.closed:
logger.info('ValueError: I/O operation on closed file in read_nonblocking().')
raise ValueError ('I/O operation on closed file in read_nonblocking().')
try:
# The real child and it's console are two different process. The console dies 0.1 sec
# later to be able to read the child's last output (before EOF). So here we check
@@ -698,7 +698,7 @@ class spawn_windows ():
except EOF:
self.flag_eof = True
raise
if self.logfile is not None:
self.logfile.write (s)
self.logfile.flush()
@@ -753,7 +753,7 @@ class spawn_windows ():
"""This sends a string to the child process. This returns the number of
bytes written. If a log file was set then the data is also written to
the log. """
(self.delaybeforesend)
if self.logfile is not None:
self.logfile.write (s)
@@ -767,7 +767,7 @@ class spawn_windows ():
def sendintr(self):
"""This sends a SIGINT to the child. It does not require
the SIGINT to be the first character on a line. """
self.wtty.sendintr()
def eof (self):
@@ -782,47 +782,47 @@ class spawn_windows ():
if not self.isalive():
return True
self.wtty.terminate_child()
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
return False
def kill(self, sig=signal.SIGTERM):
"""Sig == sigint for ctrl-c otherwise the child is terminated."""
if not self.isalive():
return
if sig == signal.SIGINT:
self.wtty.sendintr()
else:
self.wtty.terminate_child()
def wait(self):
"""This waits until the child exits. This is a blocking call. This will
not read any data from the child, so this will block forever if the
child has unread output and has terminated. In other words, the child
may have printed output then called exit(); but, technically, the child
is still alive until its output is read."""
# We can't use os.waitpid under Windows because of 'permission denied'
# exception? Perhaps if not running as admin (or UAC enabled under
# We can't use os.waitpid under Windows because of 'permission denied'
# exception? Perhaps if not running as admin (or UAC enabled under
# Vista/7). Simply loop and wait for child to exit.
while self.isalive():
time.sleep(.05) # Keep CPU utilization down
return self.exitstatus
def isalive(self):
"""Determines if the child is still alive."""
if self.terminated:
logger.debug('self.terminated is true')
return False
if self.wtty.isalive():
return True
else:
@@ -989,14 +989,14 @@ class spawn_windows ():
This method is also useful when you don't want to have to worry about
escaping regular expression characters that you want to match."""
if not isinstance(pattern_list, list):
if not isinstance(pattern_list, list):
pattern_list = [pattern_list]
for p in pattern_list:
if type(p) not in (str,) and p not in (TIMEOUT, EOF):
logger.info('TypeError: Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p)))
raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p)))
return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def expect_loop(self, searcher, timeout = -1, searchwindowsize = -1):
@@ -1012,7 +1012,7 @@ class spawn_windows ():
if timeout == -1:
timeout = self.timeout
if timeout is not None:
end_time = time.time() + timeout
end_time = time.time() + timeout
if searchwindowsize == -1:
searchwindowsize = self.searchwindowsize
@@ -1075,22 +1075,22 @@ class spawn_windows ():
def getwinsize(self):
"""This returns the terminal window size of the child tty. The return
value is a tuple of (rows, cols). """
return self.wtty.getwinsize()
def setwinsize(self, r, c):
"""Set the size of the child screen buffer. """
self.wtty.setwinsize(r, c)
def interact(self):
"""Makes the child console visible for interaction"""
self.wtty.interact()
def stop_interact(self):
"""Hides the child console from the user."""
self.wtty.stop_interact()
##############################################################################
@@ -1122,15 +1122,15 @@ class Wtty:
self.timeout = timeout
self.totalRead = 0
self.local_echo = echo
def spawn(self, command, args=[], env=None):
"""Spawns spawner.py with correct arguments."""
ts = time.time()
self.startChild(args, env)
logger.info(f"Fetch child's process and pid...")
while True:
msg = win32gui.GetMessage(0, 0, 0)
childPid = msg[1][2]
@@ -1150,33 +1150,33 @@ class Wtty:
self.pid = childPid
break
time.sleep(.05)
logger.info(f"Child's pid: {self.pid}")
if not self.__childProcess:
logger.info('ExceptionPexpect: The process ' + args[0] + ' could not be started.')
raise ExceptionPexpect ('The process ' + args[0] + ' could not be started.')
raise ExceptionPexpect ('The process ' + args[0] + ' could not be started.')
winHandle = int(win32console.GetConsoleWindow())
self.__switch = True
if winHandle != 0:
self.__parentPid = win32process.GetWindowThreadProcessId(winHandle)[1]
self.__parentPid = win32process.GetWindowThreadProcessId(winHandle)[1]
# Do we have a console attached? Do not rely on winHandle, because
# it will also be non-zero if we didn't have a console, and then
# it will also be non-zero if we didn't have a console, and then
# spawned a child process! Using sys.stdout.isatty() seems safe
self.console = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
# If the original process had a console, record a list of attached
# processes so we can check if we need to reattach/reallocate the
# processes so we can check if we need to reattach/reallocate the
# console later
self.processList = win32console.GetConsoleProcessList()
else:
self.switchTo(False)
self.__switch = False
def startChild(self, args, env):
si = win32process.GetStartupInfo()
si.dwFlags = win32process.STARTF_USESHOWWINDOW
@@ -1184,8 +1184,8 @@ class Wtty:
# Determine the directory of wexpect.py or, if we are running 'frozen'
# (eg. py2exe deployment), of the packed executable
dirname = os.path.dirname(sys.executable
if getattr(sys, 'frozen', False) else
dirname = os.path.dirname(sys.executable
if getattr(sys, 'frozen', False) else
os.path.abspath(__file__))
if getattr(sys, 'frozen', False):
logdir = os.path.splitext(sys.executable)[0]
@@ -1197,15 +1197,15 @@ class Wtty:
if getattr(sys, 'frozen', False):
# If we are running 'frozen', add library.zip and lib\library.zip
# to sys.path
# py2exe: Needs appropriate 'zipfile' option in setup script and
# py2exe: Needs appropriate 'zipfile' option in setup script and
# 'bundle_files' 3
spath.append(os.path.join(dirname, 'library.zip'))
spath.append(os.path.join(dirname, 'library.zip',
spath.append(os.path.join(dirname, 'library.zip',
os.path.basename(os.path.splitext(sys.executable)[0])))
if os.path.isdir(os.path.join(dirname, 'lib')):
dirname = os.path.join(dirname, 'lib')
spath.append(os.path.join(dirname, 'library.zip'))
spath.append(os.path.join(dirname, 'library.zip',
spath.append(os.path.join(dirname, 'library.zip',
os.path.basename(os.path.splitext(sys.executable)[0])))
pyargs.insert(0, '-S') # skip 'import site'
pid = win32process.GetCurrentProcessId()
@@ -1213,37 +1213,37 @@ class Wtty:
cp = self.codepage or windll.kernel32.GetACP()
# If we are running 'frozen', expect python.exe in the same directory
# as the packed executable.
# py2exe: The python executable can be included via setup script by
# py2exe: The python executable can be included via setup script by
# adding it to 'data_files'
if getattr(sys, 'frozen', False):
python_executable = os.path.join(dirname, 'python.exe')
python_executable = os.path.join(dirname, 'python.exe')
else:
python_executable = os.path.join(os.path.dirname(sys.executable), 'python.exe')
commandLine = '"%s" %s "%s"' % (python_executable,
' '.join(pyargs),
commandLine = '"%s" %s "%s"' % (python_executable,
' '.join(pyargs),
f"import sys; sys.path = {spath} + sys.path;"
f"args = {args}; import wexpect;"
f"wexpect.ConsoleReader(wexpect.join_args(args), {pid}, {tid}, cp={cp}, logdir={logdir})"
)
logger.info(f'CreateProcess: {commandLine}')
self.__oproc, x, self.conpid, self.__otid = win32process.CreateProcess(None, commandLine, None, None, False,
self.__oproc, x, self.conpid, self.__otid = win32process.CreateProcess(None, commandLine, None, None, False,
win32process.CREATE_NEW_CONSOLE, env, None, si)
logger.info(f'self.__oproc: {self.__oproc}')
logger.info(f'x: {x}')
logger.info(f'self.conpid: {self.conpid}')
logger.info(f'self.__otid: {self.__otid}')
def switchTo(self, attatched=True):
"""Releases from the current console and attatches
to the childs."""
if not self.__switch:
return
try:
# No 'attached' check is needed, FreeConsole() can be called multiple times.
win32console.FreeConsole()
@@ -1254,11 +1254,11 @@ class Wtty:
# When child has finished...
logger.info('EOF: End Of File (EOF) in switchTo().')
raise EOF('End Of File (EOF) in switchTo().')
win32console.AttachConsole(self.conpid)
self.__consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
self.__consout = self.getConsoleOut()
except pywintypes.error as e:
# pywintypes.error: (5, 'AttachConsole', 'Access is denied.')
# When child has finished...
@@ -1274,15 +1274,15 @@ class Wtty:
self.switchBack()
logger.info(traceback.format_exc())
raise
def switchBack(self):
"""Releases from the current console and attaches
"""Releases from the current console and attaches
to the parents."""
if not self.__switch:
return
if self.console:
# If we originally had a console, re-attach it (or allocate a new one)
# If we didn't have a console to begin with, there's no need to
@@ -1294,44 +1294,44 @@ class Wtty:
else:
# Our original console has been free'd, allocate a new one
win32console.AllocConsole()
self.__consin = None
self.__consout = None
def getConsoleOut(self):
consout = win32file.CreateFile('CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
consout = win32file.CreateFile('CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
0)
return win32console.PyConsoleScreenBufferType(consout)
return win32console.PyConsoleScreenBufferType(consout)
def getchild(self):
"""Returns a handle to the child process."""
return self.__childProcess
def terminate_child(self):
"""Terminate the child process."""
win32api.TerminateProcess(self.__childProcess, 1)
# win32api.win32process.TerminateProcess(self.__childProcess, 1)
def createKeyEvent(self, char):
"""Creates a single key record corrosponding to
the ascii character char."""
evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
evt.KeyDown = True
evt.Char = char
evt.RepeatCount = 1
return evt
return evt
def write(self, s):
"""Writes input into the child consoles input buffer."""
if len(s) == 0:
return 0
self.switchTo()
@@ -1341,41 +1341,41 @@ class Wtty:
records = [self.createKeyEvent(c) for c in str(s)]
if not self.__consout:
return ""
# Store the current cursor position to hide characters in local echo disabled mode (workaround).
consinfo = self.__consout.GetConsoleScreenBufferInfo()
startCo = consinfo['CursorPosition']
# Send the string to console input
wrote = self.__consin.WriteConsoleInput(records)
# Wait until all input has been recorded by the console.
ts = time.time()
while self.__consin.PeekConsoleInput(8) != ():
if time.time() > ts + len(s) * .1 + .5:
break
time.sleep(.05)
# Hide characters in local echo disabled mode (workaround).
if not self.local_echo:
self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo)
return wrote
finally:
self.switchBack()
def getCoord(self, offset):
"""Converts an offset to a point represented as a tuple."""
x = offset % self.__consSize[0]
y = offset // self.__consSize[0]
return win32console.PyCOORDType(x, y)
def getOffset(self, coord):
"""Converts a tuple-point to an offset."""
return coord.X + coord.Y * self.__consSize[0]
def readConsole(self, startCo, endCo):
"""Reads the console area from startCo to endCo and returns it
as a string."""
@@ -1387,10 +1387,10 @@ class Wtty:
startOff = self.getOffset(startCo)
endOff = self.getOffset(endCo)
readlen = endOff - startOff
if readlen <= 0:
break
if readlen > 4000:
readlen = 4000
endPoint = self.getCoord(startOff + readlen)
@@ -1403,12 +1403,12 @@ class Wtty:
startCo = endPoint
return ''.join(buff)
def parseData(self, s):
"""Ensures that special characters are interpretted as
newlines or blanks, depending on if there written over
characters or screen-buffer-fill characters."""
strlist = []
for i, c in enumerate(s):
if c == screenbufferfillchar:
@@ -1419,35 +1419,35 @@ class Wtty:
s = ''.join(strlist)
return s
def readConsoleToCursor(self):
"""Reads from the current read position to the current cursor
position and inserts the string into self.__buffer."""
if not self.__consout:
return ""
consinfo = self.__consout.GetConsoleScreenBufferInfo()
cursorPos = consinfo['CursorPosition']
logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
isSameX = cursorPos.X == self.__currentReadCo.X
isSameY = cursorPos.Y == self.__currentReadCo.Y
isSamePos = isSameX and isSameY
logger.debug('isSameY: %r' % isSameY)
logger.debug('isSamePos: %r' % isSamePos)
if isSameY or not self.lastReadData.endswith('\r\n'):
# Read the current slice again
self.totalRead -= self.lastRead
self.__currentReadCo.X = 0
self.__currentReadCo.Y = self.__bufferY
logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
raw = self.readConsole(self.__currentReadCo, cursorPos)
rawlist = []
while raw:
@@ -1461,16 +1461,16 @@ class Wtty:
# Record the Y offset where the most recent line break was detected
self.__bufferY += len(rawlist) - i
break
logger.debug('lastReadData: %r' % self.lastReadData)
logger.debug('s: %r' % s)
if isSamePos and self.lastReadData == s:
logger.debug('isSamePos and self.lastReadData == s')
s = ''
logger.debug('s: %r' % s)
if s:
lastReadData = self.lastReadData
pos = self.getOffset(self.__currentReadCo)
@@ -1491,7 +1491,7 @@ class Wtty:
self.lastRead = lastRead
else:
# Cursor has been repositioned
s = '\r' + s
s = '\r' + s
logger.debug('s: %r' % s)
self.__buffer.seek(pos)
self.__buffer.truncate()
@@ -1501,36 +1501,36 @@ class Wtty:
self.__currentReadCo.Y = cursorPos.Y
return s
def read_nonblocking(self, size):
"""Reads data from the console if available, otherwise
returns empty string"""
returns empty string"""
try:
self.switchTo()
time.sleep(.01)
if self.__currentReadCo.Y > maxconsoleY:
time.sleep(.2)
s = self.readConsoleToCursor()
if self.__currentReadCo.Y > maxconsoleY:
self.refreshConsole()
return s
finally:
self.switchBack()
raise Exception('Unreachable code...') # pragma: no cover
def refreshConsole(self):
"""Clears the console after pausing the child and
reading all the data currently on the console."""
orig = win32console.PyCOORDType(0, 0)
self.__consout.SetConsoleCursorPosition(orig)
self.__currentReadCo.X = 0
@@ -1539,81 +1539,81 @@ class Wtty:
# Use NUL as fill char because it displays as whitespace
# (if we interact() with the child)
self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig)
self.__bufferY = 0
self.__buffer.truncate(0)
consinfo = self.__consout.GetConsoleScreenBufferInfo()
cursorPos = consinfo['CursorPosition']
logger.debug('refreshConsole: cursorPos %s' % cursorPos)
def setecho(self, state):
"""Sets the echo mode of the child console.
This is a workaround of the setecho. The original GetConsoleMode() / SetConsoleMode()
methods didn't work. See git history for the concrete implementation.
2020.01.09 raczben
"""
self.local_echo = state
def getecho(self):
"""Returns the echo mode of the child console.
This is a workaround of the getecho. The original GetConsoleMode() / SetConsoleMode()
methods didn't work. See git history for the concrete implementation.
2020.01.09 raczben
"""
return self.local_echo
def getwinsize(self):
"""Returns the size of the child console as a tuple of
(rows, columns)."""
self.switchTo()
try:
size = self.__consout.GetConsoleScreenBufferInfo()['Size']
finally:
self.switchBack()
return (size.Y, size.X)
def setwinsize(self, r, c):
"""Sets the child console screen buffer size to (r, c)."""
self.switchTo()
try:
self.__consout.SetConsoleScreenBufferSize(win32console.PyCOORDType(c, r))
finally:
self.switchBack()
def interact(self):
"""Displays the child console for interaction."""
self.switchTo()
try:
win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW)
finally:
self.switchBack()
def stop_interact(self):
"""Hides the child console."""
self.switchTo()
try:
win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_HIDE)
finally:
self.switchBack()
def isalive(self, console=False):
"""True if the child is still alive, false otherwise"""
if console:
return win32process.GetExitCodeProcess(self.__conProcess) == win32con.STILL_ACTIVE
else:
return win32process.GetExitCodeProcess(self.__childProcess) == win32con.STILL_ACTIVE
class ConsoleReader: # pragma: no cover
def __init__(self, path, pid, tid, env = None, cp=None, logdir=None):
self.logdir = logdir
logger.info('consolepid: {}'.format(os.getpid()))
@@ -1633,9 +1633,9 @@ class ConsoleReader: # pragma: no cover
try:
consout = self.getConsoleOut()
self.initConsole(consout)
si = win32process.GetStartupInfo()
self.__childProcess, _, childPid, self.__tid = win32process.CreateProcess(None, path, None, None, False,
self.__childProcess, _, childPid, self.__tid = win32process.CreateProcess(None, path, None, None, False,
0, None, None, si)
logger.info('childPid: {} host_pid: {}'.format(childPid, pid))
except Exception:
@@ -1643,62 +1643,62 @@ class ConsoleReader: # pragma: no cover
time.sleep(.1)
win32api.PostThreadMessage(int(tid), win32con.WM_USER, 0, 0)
sys.exit()
time.sleep(.1)
win32api.PostThreadMessage(int(tid), win32con.WM_USER, childPid, 0)
parent = win32api.OpenProcess(win32con.PROCESS_TERMINATE | win32con.PROCESS_QUERY_INFORMATION , 0, int(pid))
paused = False
while True:
consinfo = consout.GetConsoleScreenBufferInfo()
cursorPos = consinfo['CursorPosition']
if win32process.GetExitCodeProcess(parent) != win32con.STILL_ACTIVE or win32process.GetExitCodeProcess(self.__childProcess) != win32con.STILL_ACTIVE:
time.sleep(.1)
try:
win32process.TerminateProcess(self.__childProcess, 0)
except pywintypes.error as e:
# 'Access denied' happens always? Perhaps if not
# running as admin (or UAC enabled under Vista/7).
# Don't log. Child process will exit regardless when
# 'Access denied' happens always? Perhaps if not
# running as admin (or UAC enabled under Vista/7).
# Don't log. Child process will exit regardless when
# calling sys.exit
if e.args[0] != winerror.ERROR_ACCESS_DENIED:
logger.info(e)
logger.info('Exiting...')
sys.exit()
if cursorPos.Y > maxconsoleY and not paused:
self.suspendThread()
paused = True
if cursorPos.Y <= maxconsoleY and paused:
self.resumeThread()
paused = False
time.sleep(.1)
except Exception as e:
logger.info(e)
time.sleep(.1)
def handler(self, sig):
def handler(self, sig):
logger.info(sig)
return False
def getConsoleOut(self):
consout = win32file.CreateFile('CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
consout = win32file.CreateFile('CONOUT$',
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None,
win32con.OPEN_EXISTING,
0,
0)
return win32console.PyConsoleScreenBufferType(consout)
def initConsole(self, consout):
def initConsole(self, consout):
rect = win32console.PySMALL_RECTType(0, 0, 79, 24)
consout.SetConsoleWindowInfo(True, rect)
size = win32console.PyCOORDType(80, 16000)
@@ -1706,20 +1706,20 @@ class ConsoleReader: # pragma: no cover
pos = win32console.PyCOORDType(0, 0)
# Use NUL as fill char because it displays as whitespace
# (if we interact() with the child)
consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos)
consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos)
def suspendThread(self):
"""Pauses the main thread of the child process."""
handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.__tid)
win32process.SuspendThread(handle)
def resumeThread(self):
"""Un-pauses the main thread of the child process."""
handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.__tid)
win32process.ResumeThread(handle)
class searcher_string (object):
"""This is a plain string search helper for the spawn.expect_any() method.
@@ -1795,7 +1795,7 @@ class searcher_string (object):
# rescanning until we've read three more bytes.
#
# Sadly, I don't know enough about this interesting topic. /grahn
for index, s in self._strings:
if searchwindowsize is None:
# the match, if any, can only be in the fresh data,
@@ -1874,7 +1874,7 @@ class searcher_re (object):
'buffer' which have not been searched before.
See class spawn for the 'searchwindowsize' argument.
If there is a match this returns the index of that string, and sets
'start', 'end' and 'match'. Otherwise, returns -1."""

View File

@@ -1,7 +1,7 @@
"""Wexpect is a Windows variant of pexpect https://pexpect.readthedocs.io.
Wexpect is a Python module for spawning child applications and controlling
them automatically.
them automatically.
wexpect util contains small functions, and classes, which are used in multiple classes.
The command line argument parsers, and the Exceptions placed here.
@@ -48,18 +48,23 @@ import signal
EOF_CHAR = b'\x04'
SIGNAL_CHARS = {
signal.SIGTERM: b'\x011', # Device control 1
signal.SIGINT: b'\x012', # Device control 2
}
signal.SIGTERM: b'\x011', # Device control 1
signal.SIGINT: b'\x012', # Device control 2
}
SPAM = 5
logging.addLevelName(SPAM, "SPAM")
def spam(self, message, *args, **kws):
if self.isEnabledFor(SPAM):
# Yes, logger takes its '*args' as 'args'.
self._log(SPAM, message, args, **kws)
logging.Logger.spam = spam
def init_logger(logger):
try:
logger_level = os.environ['WEXPECT_LOGGER_LEVEL']
@@ -72,13 +77,15 @@ def init_logger(logger):
logger_filename = f'{logger_filename}.log'
os.makedirs(os.path.dirname(logger_filename), exist_ok=True)
fh = logging.FileHandler(logger_filename, 'w', 'utf-8')
formatter = logging.Formatter('%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s')
formatter = logging.Formatter(
'%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
except KeyError:
logger.setLevel(logging.ERROR)
def split_command_line(command_line, escape_char = '^'):
def split_command_line(command_line, escape_char='^'):
"""This splits a command line into a list of arguments. It splits arguments
on spaces, but handles embedded quotes, doublequotes, and escaped
characters. It's impossible to do this with a regular expression, so I
@@ -92,21 +99,21 @@ def split_command_line(command_line, escape_char = '^'):
state_esc = 1
state_singlequote = 2
state_doublequote = 3
state_whitespace = 4 # The state of consuming whitespace between commands.
state_whitespace = 4 # The state of consuming whitespace between commands.
state = state_basic
for c in command_line:
if state == state_basic or state == state_whitespace:
if c == escape_char: # Escape the next character
if c == escape_char: # Escape the next character
state = state_esc
elif c == r"'": # Handle single quote
elif c == r"'": # Handle single quote
state = state_singlequote
elif c == r'"': # Handle double quote
elif c == r'"': # Handle double quote
state = state_doublequote
elif c.isspace():
# Add arg to arg_list if we aren't in the middle of whitespace.
if state == state_whitespace:
None # Do nothing.
None # Do nothing.
else:
arg_list.append(arg)
arg = ''
@@ -132,8 +139,9 @@ def split_command_line(command_line, escape_char = '^'):
arg_list.append(arg)
return arg_list
def join_args(args):
"""Joins arguments into a command line. It quotes all arguments that contain
"""Joins arguments a command line. It quotes all arguments that contain
spaces or any of the characters ^!$%&()[]{}=;'+,`~"""
commandline = []
for arg in args:
@@ -178,6 +186,6 @@ class EOF(ExceptionPexpect):
"""Raised when EOF is read from a child. This usually means the child has exited.
The user can wait to EOF, which means he waits the end of the execution of the child process."""
class TIMEOUT(ExceptionPexpect):
"""Raised when a read time exceeds the timeout. """