From 4bd5dea6938d5f3e2dfbd3e86b8de82c4985b397 Mon Sep 17 00:00:00 2001 From: Benedek Racz Date: Thu, 6 Feb 2020 11:47:56 +0100 Subject: [PATCH] [FIX] flake8 linter first run; Atom first fixes --- setup.cfg | 10 +- wexpect/__init__.py | 20 +- wexpect/console_reader.py | 213 ++++++++++---------- wexpect/host.py | 363 ++++++++++++++++++---------------- wexpect/legacy_wexpect.py | 404 +++++++++++++++++++------------------- wexpect/wexpect_util.py | 34 ++-- 6 files changed, 542 insertions(+), 502 deletions(-) diff --git a/setup.cfg b/setup.cfg index bf6df76..bebae22 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,12 +9,12 @@ requires-python = >=3.4 project_urls = Source Code = https://github.com/raczben/wexpect license = MIT -classifier = +classifier = Development Status :: 4 - Beta Environment :: Console Intended Audience :: Developers Intended Audience :: Information Technology - Operating System :: Microsoft :: Windows + Operating System :: Microsoft :: Windows Programming Language :: Python :: 3.3 Programming Language :: Python :: 3.4 Programming Language :: Python :: 3.5 @@ -30,3 +30,9 @@ test = setuptools>=38.0 codecov twine + +[flake8] + max-line-length = 100 + ignore = + # E402: Module level import not at top of the file + E402 diff --git a/wexpect/__init__.py b/wexpect/__init__.py index 8114ffd..ff091b6 100644 --- a/wexpect/__init__.py +++ b/wexpect/__init__.py @@ -1,7 +1,7 @@ # __init__.py import os -import pkg_resources +import pkg_resources try: spawn_class_name = os.environ['WEXPECT_SPAWN_CLASS'] @@ -22,10 +22,10 @@ if spawn_class_name == 'legacy_wexpect': from .legacy_wexpect import searcher_re __all__ = ['ExceptionPexpect', 'EOF', 'TIMEOUT', 'spawn', 'run', 'split_command_line', - '__version__', 'ConsoleReader', 'join_args', 'searcher_string', 'searcher_re'] - + '__version__', 'ConsoleReader', 'join_args', 'searcher_string', 'searcher_re'] + else: - + from .wexpect_util import split_command_line from .wexpect_util import join_args from .wexpect_util import ExceptionPexpect @@ -40,19 +40,19 @@ else: from .host import run from .host import searcher_string from .host import searcher_re - + try: spawn = globals()[spawn_class_name] except KeyError: print(f'Error: no spawn class: {spawn_class_name}') raise - + # The version is handled by the package: pbr, which derives the version from the git tags. try: __version__ = pkg_resources.require("wexpect")[0].version - except: # pragma: no cover + except Exception: # pragma: no cover __version__ = '0.0.1.unkowndev0' - + __all__ = ['split_command_line', 'join_args', 'ExceptionPexpect', 'EOF', 'TIMEOUT', - 'ConsoleReaderSocket', 'ConsoleReaderPipe', 'spawn', 'SpawnSocket', 'SpawnPipe', 'run', - 'searcher_string', 'searcher_re', '__version__'] + 'ConsoleReaderSocket', 'ConsoleReaderPipe', 'spawn', 'SpawnSocket', 'SpawnPipe', + 'run', 'searcher_string', 'searcher_re', '__version__'] diff --git a/wexpect/console_reader.py b/wexpect/console_reader.py index 2397124..329fa55 100644 --- a/wexpect/console_reader.py +++ b/wexpect/console_reader.py @@ -1,7 +1,7 @@ """Wexpect is a Windows variant of pexpect https://pexpect.readthedocs.io. Wexpect is a Python module for spawning child applications and controlling -them automatically. +them automatically. console_reader Implements a virtual terminal, and starts the child program. The main wexpect.Spawn class connect to this class to reach the child's terminal. @@ -42,7 +42,6 @@ import logging import os import traceback import psutil -import signal from io import StringIO import ctypes @@ -59,38 +58,38 @@ from .wexpect_util import init_logger from .wexpect_util import EOF_CHAR from .wexpect_util import SIGNAL_CHARS -# +# # System-wide constants -# +# screenbufferfillchar = '\4' maxconsoleY = 8000 default_port = 4321 - # # Create logger: We write logs only to file. Printing out logs are dangerous, because of the deep # console manipulation. # logger = logging.getLogger('wexpect') - + init_logger(logger) + class ConsoleReaderBase: """Consol class (aka. client-side python class) for the child. - + This class initialize the console starts the child in it and reads the console periodically. """ def __init__(self, path, host_pid, codepage=None, window_size_x=80, window_size_y=25, buffer_size_x=80, buffer_size_y=16000, local_echo=True, interact=False, **kwargs): - """Initialize the console starts the child in it and reads the console periodically. + """Initialize the console starts the child in it and reads the console periodically. Args: path (str): Child's executable with arguments. parent_pid (int): Parent (aka. host) process process-ID codepage (:obj:, optional): Output console code page. """ - self.lastRead = 0 + self.lastRead = 0 self.__bufferY = 0 self.lastReadData = "" self.totalRead = 0 @@ -107,70 +106,70 @@ class ConsoleReaderBase: self.child_process = None self.child_pid = None self.enable_signal_chars = True - + logger.info("ConsoleReader started") - + if codepage is None: codepage = windll.kernel32.GetACP() - + try: logger.info("Setting console output code page to %s" % codepage) win32console.SetConsoleOutputCP(codepage) - logger.info("Console output code page: %s" % ctypes.windll.kernel32.GetConsoleOutputCP()) + logger.info( + "Console output code page: %s" % ctypes.windll.kernel32.GetConsoleOutputCP()) except Exception as e: logger.info(e) - + try: self.create_connection(**kwargs) logger.info('Spawning %s' % path) try: self.initConsole() si = win32process.GetStartupInfo() - self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess(None, path, None, None, False, - 0, None, None, si) + self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess( + None, path, None, None, False, 0, None, None, si) self.child_process = psutil.Process(self.child_pid) - + logger.info(f'Child pid: {self.child_pid} Console pid: {self.console_pid}') - - except: + + except Exception: logger.info(traceback.format_exc()) return - + if interact: self.interact() self.interact() - + self.read_loop() - except: + except Exception: logger.error(traceback.format_exc()) time.sleep(.1) finally: try: self.terminate_child() - time.sleep(.01) + time.sleep(.01) self.send_to_host(self.readConsoleToCursor()) self.sendeof() time.sleep(.1) self.close_connection() logger.info('Console finished.') - except: + except Exception: logger.error(traceback.format_exc()) time.sleep(.1) - - + def read_loop(self): paused = False - + while True: if not self.isalive(self.host_process): logger.info('Host process has been died.') return - + self.child_exitstatus = win32process.GetExitCodeProcess(self.__childProcess) if self.child_exitstatus != win32con.STILL_ACTIVE: logger.info(f'Child finished with code: {self.child_exitstatus}') - return - + return + consinfo = self.consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] self.send_to_host(self.readConsoleToCursor()) @@ -180,24 +179,24 @@ class ConsoleReaderBase: else: logger.spam(f'get_from_host: {s}') if self.enable_signal_chars: - for sig,char in SIGNAL_CHARS.items(): + for sig, char in SIGNAL_CHARS.items(): if char in s: self.child_process.send_signal(sig) s = s.decode() self.write(s) - + if cursorPos.Y > maxconsoleY and not paused: logger.info('cursorPos %s' % cursorPos) self.suspendThread() paused = True - + if cursorPos.Y <= maxconsoleY and paused: logger.info('cursorPos %s' % cursorPos) self.resumeThread() paused = False - + time.sleep(.02) - + def terminate_child(self): try: if self.child_process: @@ -205,7 +204,7 @@ class ConsoleReaderBase: except psutil.NoSuchProcess: logger.info('The process has already died.') return - + def isalive(self, process): """True if the child is still alive, false otherwise""" try: @@ -213,10 +212,10 @@ class ConsoleReaderBase: return False except psutil.TimeoutExpired: return True - + def write(self, s): """Writes input into the child consoles input buffer.""" - + if len(s) == 0: return 0 if s[-1] == '\n': @@ -224,44 +223,45 @@ class ConsoleReaderBase: records = [self.createKeyEvent(c) for c in str(s)] if not self.consout: return "" - - # Store the current cursor position to hide characters in local echo disabled mode (workaround). + + # Store the current cursor position to hide characters in local echo disabled mode + # (workaround). consinfo = self.consout.GetConsoleScreenBufferInfo() startCo = consinfo['CursorPosition'] - + # Send the string to console input wrote = self.consin.WriteConsoleInput(records) - + # Wait until all input has been recorded by the console. ts = time.time() while self.consin.PeekConsoleInput(8) != (): if time.time() > ts + len(s) * .1 + .5: break time.sleep(.05) - + # Hide characters in local echo disabled mode (workaround). if not self.local_echo: self.consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo) - + return wrote - + def createKeyEvent(self, char): """Creates a single key record corrosponding to the ascii character char.""" - + evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT) evt.KeyDown = True evt.Char = char evt.RepeatCount = 1 - return evt - + return evt + def initConsole(self, consout=None, window_size_x=80, window_size_y=25, buffer_size_x=80, buffer_size_y=16000): if not consout: - consout=self.getConsoleOut() - + consout = self.getConsoleOut() + self.consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE) - + rect = win32console.PySMALL_RECTType(0, 0, window_size_x-1, window_size_y-1) consout.SetConsoleWindowInfo(True, rect) size = win32console.PyCOORDType(buffer_size_x, buffer_size_y) @@ -269,18 +269,18 @@ class ConsoleReaderBase: pos = win32console.PyCOORDType(0, 0) # Use NUL as fill char because it displays as whitespace # (if we interact() with the child) - consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos) - + consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos) + consinfo = consout.GetConsoleScreenBufferInfo() self.__consSize = consinfo['Size'] logger.info('self.__consSize: ' + str(self.__consSize)) - self.startCursorPos = consinfo['CursorPosition'] - + self.startCursorPos = consinfo['CursorPosition'] + def parseData(self, s): """Ensures that special characters are interpretted as newlines or blanks, depending on if there written over characters or screen-buffer-fill characters.""" - + strlist = [] for i, c in enumerate(s): if c == screenbufferfillchar: @@ -291,44 +291,45 @@ class ConsoleReaderBase: s = ''.join(strlist) return s - + def getConsoleOut(self): - consfile = win32file.CreateFile('CONOUT$', - win32con.GENERIC_READ | win32con.GENERIC_WRITE, - win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, - None, - win32con.OPEN_EXISTING, - 0, - 0) - + consfile = win32file.CreateFile( + 'CONOUT$', + win32con.GENERIC_READ | win32con.GENERIC_WRITE, + win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, + None, + win32con.OPEN_EXISTING, + 0, + 0) + self.consout = win32console.PyConsoleScreenBufferType(consfile) return self.consout - + def getCoord(self, offset): """Converts an offset to a point represented as a tuple.""" - - x = offset % self.__consSize.X + + x = offset % self.__consSize.X y = offset // self.__consSize.X return win32console.PyCOORDType(x, y) def getOffset(self, coord): """Converts a tuple-point to an offset.""" - + return coord.X + coord.Y * self.__consSize.X - + def readConsole(self, startCo, endCo): """Reads the console area from startCo to endCo and returns it as a string.""" - + if startCo is None: - startCo = self.startCursorPos + startCo = self.startCursorPos startCo.Y = startCo.Y - + if endCo is None: consinfo = self.consout.GetConsoleScreenBufferInfo() endCo = consinfo['CursorPosition'] - endCo= self.getCoord(0 + self.getOffset(endCo)) - + endCo = self.getCoord(0 + self.getOffset(endCo)) + buff = [] self.lastRead = 0 @@ -336,10 +337,10 @@ class ConsoleReaderBase: startOff = self.getOffset(startCo) endOff = self.getOffset(endCo) readlen = endOff - startOff - + if readlen <= 0: break - + if readlen > 4000: readlen = 4000 endPoint = self.getCoord(startOff + readlen) @@ -352,34 +353,34 @@ class ConsoleReaderBase: startCo = endPoint return ''.join(buff) - + def readConsoleToCursor(self): """Reads from the current read position to the current cursor position and inserts the string into self.__buffer.""" - + if not self.consout: return "" - + consinfo = self.consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] - + logger.spam('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) isSameX = cursorPos.X == self.__currentReadCo.X isSameY = cursorPos.Y == self.__currentReadCo.Y isSamePos = isSameX and isSameY - + logger.spam('isSameY: %r' % isSameY) logger.spam('isSamePos: %r' % isSamePos) - + if isSameY or not self.lastReadData.endswith('\r\n'): # Read the current slice again self.totalRead -= self.lastRead self.__currentReadCo.X = 0 self.__currentReadCo.Y = self.__bufferY - + logger.spam('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) - + raw = self.readConsole(self.__currentReadCo, cursorPos) rawlist = [] while raw: @@ -392,17 +393,17 @@ class ConsoleReaderBase: # Record the Y offset where the most recent line break was detected self.__bufferY += len(rawlist) - i break - + logger.spam('lastReadData: %r' % self.lastReadData) if s: logger.debug('Read: %r' % s) else: logger.spam('Read: %r' % s) - + if isSamePos and self.lastReadData == s: logger.spam('isSamePos and self.lastReadData == s') s = '' - + if s: lastReadData = self.lastReadData pos = self.getOffset(self.__currentReadCo) @@ -430,46 +431,46 @@ class ConsoleReaderBase: self.__currentReadCo.Y = cursorPos.Y return s - + def interact(self): """Displays the child console for interaction.""" - + logger.debug('Start interact window') win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW) - + def sendeof(self): """This sends an EOF to the host. This sends a character which inform the host that child has been finished, and all of it's output has been send to host. """ self.send_to_host(EOF_CHAR) - - -class ConsoleReaderSocket(ConsoleReaderBase): - + + +class ConsoleReaderSocket(ConsoleReaderBase): + def create_connection(self, **kwargs): - try: + try: self.port = kwargs['port'] # Create a TCP/IP socket self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = ('localhost', self.port) self.sock.bind(server_address) logger.info(f'Socket started at port: {self.port}') - + # Listen for incoming connections self.sock.settimeout(5) self.sock.listen(1) self.connection, client_address = self.sock.accept() self.connection.settimeout(.01) logger.info(f'Client connected: {client_address}') - except: + except Exception: logger.error(f"Port: {self.port}") raise - + def close_connection(self): if self.connection: self.connection.close() - + def send_to_host(self, msg): # convert to bytes if isinstance(msg, str): @@ -479,7 +480,7 @@ class ConsoleReaderSocket(ConsoleReaderBase): else: logger.spam(f'Sending msg: {msg}') self.connection.sendall(msg) - + def get_from_host(self): try: msg = self.connection.recv(4096) @@ -498,8 +499,8 @@ class ConsoleReaderSocket(ConsoleReaderBase): else: # got a message do something :) return msg - - + + class ConsoleReaderPipe(ConsoleReaderBase): def create_connection(self, **kwargs): pipe_name = 'wexpect_{}'.format(self.console_pid) @@ -513,11 +514,11 @@ class ConsoleReaderPipe(ConsoleReaderBase): logger.info("waiting for client") win32pipe.ConnectNamedPipe(self.pipe, None) logger.info('got client') - + def close_connection(self): if self.pipe: win32file.CloseHandle(self.pipe) - + def send_to_host(self, msg): # convert to bytes if isinstance(msg, str): @@ -527,7 +528,7 @@ class ConsoleReaderPipe(ConsoleReaderBase): else: logger.spam(f'Sending msg: {msg}') win32file.WriteFile(self.pipe, msg) - + def get_from_host(self): data, avail, bytes_left = win32pipe.PeekNamedPipe(self.pipe, 4096) logger.spam(f'data: {data} avail:{avail} bytes_left{bytes_left}') diff --git a/wexpect/host.py b/wexpect/host.py index f775d6a..c803cb4 100644 --- a/wexpect/host.py +++ b/wexpect/host.py @@ -7,7 +7,7 @@ scripts for duplicating software package installations on different servers. It can be used for automated software testing. Wexpect is in the spirit of Don Libes' Expect, but Wexpect is pure Python. Other Expect-like modules for Python require TCL and Expect or require C extensions to be compiled. Wexpect does not -use C, Expect, or TCL extensions. +use C, Expect, or TCL extensions. There are two main interfaces to Wexpect -- the function, run() and the class, spawn. You can call the run() function to execute a command and return the @@ -92,12 +92,12 @@ from .wexpect_util import EOF_CHAR from .wexpect_util import SIGNAL_CHARS logger = logging.getLogger('wexpect') - + init_logger(logger) -def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None, logfile=None, - cwd=None, env=None, **kwargs): +def run(command, timeout=-1, withexitstatus=False, events=None, extra_args=None, logfile=None, + cwd=None, env=None, **kwargs): """ This function runs the given command; waits for it to finish; then returns all output as a string. STDERR is included in output. If the full @@ -162,21 +162,22 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None if timeout == -1: child = spawn(command, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs) else: - child = spawn(command, timeout=timeout, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs) + child = spawn( + command, timeout=timeout, maxread=2000, logfile=logfile, cwd=cwd, env=env, **kwargs) if events is not None: patterns = list(events.keys()) responses = list(events.values()) else: - patterns=None # We assume that EOF or TIMEOUT will save us. - responses=None + patterns = None # We assume that EOF or TIMEOUT will save us. + responses = None child_result_list = [] event_count = 0 while 1: try: - index = child.expect (patterns) + index = child.expect(patterns) if type(child.after) in (str,): child_result_list.append(child.before + child.after) - else: # child.after may have been a TIMEOUT or EOF, so don't cat those. + else: # child.after may have been a TIMEOUT or EOF, so don't cat those. child_result_list.append(child.before) if type(responses[index]) in (str,): child.send(responses[index]) @@ -189,7 +190,7 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None break else: logger.warning("TypeError ('The callback must be a string or function type.')") - raise TypeError ('The callback must be a string or function type.') + raise TypeError('The callback must be a string or function type.') event_count = event_count + 1 except TIMEOUT: child_result_list.append(child.before) @@ -204,15 +205,16 @@ def run (command, timeout=-1, withexitstatus=False, events=None, extra_args=None else: return child_result - + class SpawnBase: def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None, - logfile=None, cwd=None, env=None, codepage=None, echo=True, safe_exit=True, interact=False, **kwargs): + logfile=None, cwd=None, env=None, codepage=None, echo=True, safe_exit=True, + interact=False, **kwargs): """This starts the given command in a child process. This does all the fork/exec type of stuff for a pty. This is called by __init__. If args is empty then command will be parsed (split on spaces) and args will be - set to parsed arguments. - + set to parsed arguments. + The pid and child_fd of this object get set by this method. Note that it is difficult for this method to fail. You cannot detect if the child process cannot start. @@ -222,12 +224,12 @@ class SpawnBase: That may not necessarily be bad because you may haved spawned a child that performs some task; creates no stdout output; and then dies. """ - self.host_pid = os.getpid() # That's me + self.host_pid = os.getpid() # That's me self.console_process = None self.console_pid = None self.child_process = None self.child_pid = None - + self.safe_exit = safe_exit self.searcher = None self.ignorecase = False @@ -237,23 +239,26 @@ class SpawnBase: self.match_index = None self.terminated = True self.exitstatus = None - self.status = None # status returned by os.waitpid + self.status = None # status returned by os.waitpid self.flag_eof = False self.flag_child_finished = False - self.child_fd = -1 # initially closed + self.child_fd = -1 # initially closed self.timeout = timeout self.delimiter = EOF self.codepage = codepage self.cwd = cwd self.env = env self.echo = echo - self.maxread = maxread # max bytes to read at one time into buffer - self.delaybeforesend = 0.1 # Sets sleep time used just before sending data to child. Time in seconds. - self.delayafterterminate = 0.1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds. - self.buffer = '' # This is the read buffer. See maxread. - self.searchwindowsize = searchwindowsize # Anything before searchwindowsize point is preserved, but not searched. + self.maxread = maxread # max bytes to read at one time into buffer + # delaybeforesend: Sets sleep time used just before sending data to child. Time in seconds. + self.delaybeforesend = 0.1 + # delayafterterminate: Sets delay in terminate() method to allow kernel time to update + # process status. Time in seconds. + self.delayafterterminate = 0.1 + self.buffer = '' # This is the read buffer. See maxread. + # searchwindowsize: Anything before searchwindowsize point is preserved, but not searched. + self.searchwindowsize = searchwindowsize self.interact_state = interact - # If command is an int type then it may represent a file descriptor. if type(command) == type(0): @@ -263,36 +268,37 @@ class SpawnBase: if type (args) != type([]): logger.warning("TypeError ('The argument, args, must be a list.')") raise TypeError ('The argument, args, must be a list.') - + if args == []: self.args = split_command_line(command) self.command = self.args[0] else: - self.args = args[:] # work with a copy - self.args.insert (0, command) - self.command = command - + self.args = args[:] # work with a copy + self.args.insert(0, command) + self.command = command + command_with_path = shutil.which(self.command) if command_with_path is None: logger.warning('The command was not found or was not executable: %s.' % self.command) - raise ExceptionPexpect ('The command was not found or was not executable: %s.' % self.command) + raise ExceptionPexpect( + 'The command was not found or was not executable: %s.' % self.command) self.command = command_with_path self.args[0] = self.command - self.name = '<' + ' '.join (self.args) + '>' - + self.name = '<' + ' '.join(self.args) + '>' + self.terminated = False self.closed = False - + self.child_fd = self.startChild(self.args, self.env) self.get_child_process() logger.info(f'Child pid: {self.child_pid} Console pid: {self.console_pid}') self.connect_to_child() - + def __del__(self): """This makes sure that no system resources are left open. Python only garbage collects Python objects, not the child console.""" - + try: logger.info('Deleting...') if self.child_process is not None: @@ -300,10 +306,10 @@ class SpawnBase: self.disconnect_from_child() if self.safe_exit: self.wait() - except: + except Exception: traceback.print_exc() logger.warning(traceback.format_exc()) - + def __str__(self): """This returns a human-readable string that represents the state of the object. """ @@ -336,9 +342,9 @@ class SpawnBase: si = win32process.GetStartupInfo() si.dwFlags = win32process.STARTF_USESHOWWINDOW si.wShowWindow = win32con.SW_HIDE - - dirname = os.path.dirname(sys.executable - if getattr(sys, 'frozen', False) else + + dirname = os.path.dirname(sys.executable + if getattr(sys, 'frozen', False) else os.path.abspath(__file__)) spath = [os.path.dirname(dirname)] pyargs = ['-c'] @@ -346,33 +352,35 @@ class SpawnBase: # If we are running 'frozen', add library.zip and lib\library.zip to sys.path # py2exe: Needs appropriate 'zipfile' option in setup script and 'bundle_files' 3 spath.append(os.path.join(dirname, 'library.zip')) - spath.append(os.path.join(dirname, 'library.zip', + spath.append(os.path.join(dirname, 'library.zip', os.path.basename(os.path.splitext(sys.executable)[0]))) if os.path.isdir(os.path.join(dirname, 'lib')): dirname = os.path.join(dirname, 'lib') spath.append(os.path.join(dirname, 'library.zip')) - spath.append(os.path.join(dirname, 'library.zip', + spath.append(os.path.join(dirname, 'library.zip', os.path.basename(os.path.splitext(sys.executable)[0]))) pyargs.insert(0, '-S') # skip 'import site' - + if getattr(sys, 'frozen', False): - python_executable = os.path.join(dirname, 'python.exe') + python_executable = os.path.join(dirname, 'python.exe') else: python_executable = os.path.join(os.path.dirname(sys.executable), 'python.exe') - - self.console_class_parameters.update({ + + self.console_class_parameters.update( + { 'host_pid': self.host_pid, 'local_echo': self.echo, 'interact': self.interact_state, 'codepage': self.codepage - }) - console_class_parameters_kv_pairs = [f'{k}={v}' for k,v in self.console_class_parameters.items() ] + } + ) + console_class_parameters_kv_pairs = [f'{k}={v}' for k, v in self.console_class_parameters.items()] console_class_parameters_str = ', '.join(console_class_parameters_kv_pairs) - + child_class_initializator = f"cons = wexpect.{self.console_class_name}(wexpect.join_args({args}), {console_class_parameters_str});" - - commandLine = '"%s" %s "%s"' % (python_executable, - ' '.join(pyargs), + + commandLine = '"%s" %s "%s"' % (python_executable, + ' '.join(pyargs), "import sys;" f"sys.path = {spath} + sys.path;" "import wexpect;" @@ -382,64 +390,64 @@ class SpawnBase: "wexpect.console_reader.logger.info(f'Console finished2. {cons.child_exitstatus}');" "sys.exit(cons.child_exitstatus)" ) - + logger.info(f'Console starter command:{commandLine}') - - _, _, self.console_pid, __otid = win32process.CreateProcess(None, commandLine, None, None, False, - win32process.CREATE_NEW_CONSOLE, None, self.cwd, si) - + + _, _, self.console_pid, __otid = win32process.CreateProcess( + None, commandLine, None, None, False, win32process.CREATE_NEW_CONSOLE, None, self.cwd, si) + def get_console_process(self, force=False): if force or self.console_process is None: self.console_process = psutil.Process(self.console_pid) return self.console_process - + def get_child_process(self, force=False): if force or self.console_process is None: self.child_process = self.get_console_process() self.child_pid = self.child_process.pid return self.child_process - + def close(self): # File-like object. """ Closes the child console.""" - + self.closed = self.terminate() - + def terminate(self, force=False): """Terminate the child. Force not used. """ if not self.isalive(): return True - + self.kill() time.sleep(self.delayafterterminate) if not self.isalive(): return True - + return False - + def isalive(self, trust_console=True): """True if the child is still alive, false otherwise""" if trust_console: if self.flag_eof: return False - + if self.child_process is None: # Child process has not been started... Not alive return False - + try: self.exitstatus = self.child_process.wait(timeout=0) logger.info(f'exitstatus: {self.exitstatus}') except psutil.TimeoutExpired: return True - + def kill(self, sig=signal.SIGTERM): """Sig == sigint for ctrl-c otherwise the child is terminated.""" try: self.child_process.send_signal(sig) except psutil.NoSuchProcess as e: logger.info('Child has already died. %s', e) - + def wait(self, child=True, console=False): if child: self.exitstatus = self.child_process.wait() @@ -448,8 +456,8 @@ class SpawnBase: self.exitstatus = self.console_process.wait() logger.info(f'exitstatus: {self.exitstatus}') return self.exitstatus - - def read (self, size = -1): # File-like object. + + def read(self, size=-1): # File-like object. """This reads at most "size" bytes from the file (less if the read hits EOF before obtaining size bytes). If the size argument is negative or omitted, read all data until EOF is reached. The bytes are returned as @@ -459,7 +467,7 @@ class SpawnBase: if size == 0: return '' if size < 0: - self.expect (self.delimiter) # delimiter default is EOF + self.expect(self.delimiter) # delimiter default is EOF return self.before # I could have done this more directly by not using expect(), but @@ -470,12 +478,12 @@ class SpawnBase: # Note, it's OK if size==-1 in the regex. That just means it # will never match anything in which case we stop only on EOF. cre = re.compile('.{%d}' % size, re.DOTALL) - index = self.expect ([cre, self.delimiter]) # delimiter default is EOF + index = self.expect([cre, self.delimiter]) # delimiter default is EOF if index == 0: - return self.after ### self.before should be ''. Should I assert this? + return self.after # self.before should be ''. Should I assert this? return self.before - def readline (self, size = -1): # File-like object. + def readline(self, size=-1): # File-like object. """This reads and returns one entire line. A trailing newline is kept in the string, but may be absent when a file ends with an incomplete line. Note: This readline() looks for a \\r\\n pair even on UNIX @@ -487,24 +495,24 @@ class SpawnBase: if size == 0: return '' - index = self.expect (['\r\n', self.delimiter]) # delimiter default is EOF + index = self.expect(['\r\n', self.delimiter]) # delimiter default is EOF if index == 0: return self.before + '\r\n' else: return self.before - def __iter__ (self): # File-like object. + def __iter__(self): # File-like object. """This is to support iterators over a file-like object. """ return self - def read_nonblocking (self, size = 1): + def read_nonblocking(self, size=1): """Virtual definition """ raise NotImplementedError - def __next__ (self): # File-like object. + def __next__(self): # File-like object. """This is to support iterators over a file-like object. """ @@ -519,7 +527,7 @@ class SpawnBase: def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() - def readlines (self, sizehint = -1): # File-like object. + def readlines(self, sizehint=-1): # File-like object. """This reads until EOF using readline() and returns a list containing the lines thus read. The optional "sizehint" argument is ignored. """ @@ -533,17 +541,16 @@ class SpawnBase: def isatty(self): # File-like object. """The child is always created with a console.""" - + return True def write(self, s): # File-like object. - """This is similar to send() except that there is no return value. """ self.send(s) - def writelines (self, sequence): # File-like object. + def writelines(self, sequence): # File-like object. """This calls write() for each element in the sequence. The sequence can be any iterable object producing strings, typically a list of strings. This does not add line separators There is no return value. @@ -553,11 +560,10 @@ class SpawnBase: self.write(s) def sendline(self, s=''): - """This is like send(), but it adds a line feed (os.linesep). This returns the number of bytes written. """ - n = self.send(s+'\r\n') + n = self.send(s + '\r\n') return n def sendeof(self): @@ -573,33 +579,33 @@ class SpawnBase: # platform does not define VEOF so assume CTRL-D char = chr(4) self.send(char) - + def send(self, s, delaybeforesend=None): """Virtual definition """ if delaybeforesend is None: delaybeforesend = self.delaybeforesend - + if delaybeforesend: time.sleep(delaybeforesend) - + return self._send_impl(s) - + def _send_impl(self, s): """Virtual definition """ raise NotImplementedError - + def connect_to_child(self): """Virtual definition """ raise NotImplementedError - + def disconnect_from_child(self): """Virtual definition """ raise NotImplementedError - + def compile_pattern_list(self, patterns): """This compiles a pattern-string or a list of pattern-strings. Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or a list of @@ -629,7 +635,7 @@ class SpawnBase: if type(patterns) is not list: patterns = [patterns] - compile_flags = re.DOTALL # Allow dot to match \n + compile_flags = re.DOTALL # Allow dot to match \n if self.ignorecase: compile_flags = compile_flags | re.IGNORECASE compiled_pattern_list = [] @@ -643,12 +649,16 @@ class SpawnBase: elif type(p) is type(re.compile('')): compiled_pattern_list.append(p) else: - logger.warning("TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of those type. %s' % str(type(p)))") - raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of those type. %s' % str(type(p))) + logger.warning( + "TypeError: 'Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or" + " a list of those type. %s' % str(type(p))") + raise TypeError( + 'Argument must be one of StringTypes, EOF, TIMEOUT, SRE_Pattern, or a list of' + ' those type. %s' % str(type(p))) return compiled_pattern_list - def expect(self, pattern, timeout = -1, searchwindowsize=None): + def expect(self, pattern, timeout=-1, searchwindowsize=None): """This seeks through the stream until a pattern is matched. The pattern is overloaded and may take several types. The pattern can be a StringType, EOF, a compiled re, or a list of any of those types. @@ -727,7 +737,7 @@ class SpawnBase: compiled_pattern_list = self.compile_pattern_list(pattern) return self.expect_list(compiled_pattern_list, timeout, searchwindowsize) - def expect_list(self, pattern_list, timeout = -1, searchwindowsize = -1): + def expect_list(self, pattern_list, timeout=-1, searchwindowsize=-1): """This takes a list of compiled regular expressions and returns the index into the pattern_list that matched the child output. The list may also contain EOF or TIMEOUT (which are not compiled regular @@ -740,7 +750,7 @@ class SpawnBase: return self.expect_loop(searcher_re(pattern_list), timeout, searchwindowsize) - def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1): + def expect_exact(self, pattern_list, timeout=-1, searchwindowsize=-1): """This is similar to expect(), but uses plain string matching instead of compiled regular expressions in 'pattern_list'. The 'pattern_list' may be a string; a list or other sequence of strings; or TIMEOUT and @@ -753,17 +763,21 @@ class SpawnBase: This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" - if not isinstance(pattern_list, list): + if not isinstance(pattern_list, list): pattern_list = [pattern_list] - + for p in pattern_list: if type(p) not in (str,) and p not in (TIMEOUT, EOF): - logger.warning('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p))) - raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p))) - + logger.warning( + 'TypeError: Argument must be one of StringTypes, EOF, TIMEOUT, or a list of' + ' those type. %s' % str(type(p))) + raise TypeError( + 'Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. ' + '%s' % str(type(p))) + return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize) - def expect_loop(self, searcher, timeout = -1, searchwindowsize = -1): + def expect_loop(self, searcher, timeout=-1, searchwindowsize=-1): """This is the common loop used inside expect. The 'searcher' should be an instance of searcher_re or searcher_string, which describes how and what to search for in the input. @@ -775,33 +789,33 @@ class SpawnBase: if timeout == -1: timeout = self.timeout if timeout is not None: - end_time = time.time() + timeout + end_time = time.time() + timeout if searchwindowsize == -1: searchwindowsize = self.searchwindowsize - + logger.debug(f'searcher: {searcher}') try: incoming = self.buffer freshlen = len(incoming) - while True: # Keep reading until exception or return. + while True: # Keep reading until exception or return. index = searcher.search(incoming, freshlen, searchwindowsize) if index >= 0: - self.buffer = incoming[searcher.end : ] - self.before = incoming[ : searcher.start] - self.after = incoming[searcher.start : searcher.end] + self.buffer = incoming[searcher.end:] + self.before = incoming[:searcher.start] + self.after = incoming[searcher.start:searcher.end] self.match = searcher.match self.match_index = index return self.match_index # No match at this point if timeout is not None and end_time < time.time(): logger.info('Timeout exceeded in expect_any().') - raise TIMEOUT ('Timeout exceeded in expect_any().') - # Still have time left, so read more data + raise TIMEOUT('Timeout exceeded in expect_any().') + # Still have time left, so read more data self.isalive() c = self.read_nonblocking(self.maxread) freshlen = len(c) - time.sleep (0.01) + time.sleep(0.01) incoming += c except EOF as e: self.buffer = '' @@ -831,26 +845,32 @@ class SpawnBase: self.match_index = None logger.info(f'TIMEOUT: {e}\n{self}') raise TIMEOUT(f'{e}\n{self}') - except: + except Exception: self.before = incoming self.after = None self.match = None self.match_index = None raise + class SpawnPipe(SpawnBase): - + def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None, - logfile=None, cwd=None, env=None, codepage=None, echo=True, interact=False, **kwargs): + logfile=None, cwd=None, env=None, codepage=None, echo=True, interact=False, + **kwargs): self.pipe = None self.console_class_name = 'ConsoleReaderPipe' self.console_class_parameters = {} - - super().__init__(command=command, args=args, timeout=timeout, maxread=maxread, - searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, interact=interact) - - self.delayafterterminate = 1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds. - + + super().__init__( + command=command, args=args, timeout=timeout, maxread=maxread, + searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, + interact=interact) + + # Sets delay in terminate() method to allow kernel time to update process status. Time in + # seconds. + self.delayafterterminate = 1 + def connect_to_child(self): pipe_name = 'wexpect_{}'.format(self.console_pid) pipe_full_path = r'\\.\pipe\{}'.format(pipe_name) @@ -867,22 +887,23 @@ class SpawnPipe(SpawnBase): None ) logger.debug('Pipe found') - res = win32pipe.SetNamedPipeHandleState(self.pipe, win32pipe.PIPE_READMODE_MESSAGE, None, None) + res = win32pipe.SetNamedPipeHandleState(self.pipe, win32pipe.PIPE_READMODE_MESSAGE, + None, None) if res == 0: logger.debug(f"SetNamedPipeHandleState return code: {res}") return except pywintypes.error as e: - if e.args[0] == winerror.ERROR_FILE_NOT_FOUND: #2 + if e.args[0] == winerror.ERROR_FILE_NOT_FOUND: # 2 logger.debug("no pipe, trying again in a bit later") time.sleep(0.2) else: raise - + def disconnect_from_child(self): if self.pipe: win32file.CloseHandle(self.pipe) - - def read_nonblocking (self, size = 1): + + def read_nonblocking(self, size=1): """This reads at most size characters from the child application. If the end of file is read then an EOF exception will be raised. @@ -895,24 +916,24 @@ class SpawnPipe(SpawnBase): if self.closed: logger.warning('I/O operation on closed file in read_nonblocking().') - raise ValueError ('I/O operation on closed file in read_nonblocking().') - + raise ValueError('I/O operation on closed file in read_nonblocking().') + try: s = win32file.ReadFile(self.pipe, size)[1] - + if s: logger.debug(f'Readed: {s}') else: logger.spam(f'Readed: {s}') - + if b'\x04' in s: self.flag_eof = True logger.info("EOF: EOF character has been arrived") raise EOF('EOF character has been arrived') - + return s.decode() except pywintypes.error as e: - if e.args[0] == winerror.ERROR_BROKEN_PIPE: #109 + if e.args[0] == winerror.ERROR_BROKEN_PIPE: # 109 self.flag_eof = True logger.info("EOF('broken pipe, bye bye')") raise EOF('broken pipe, bye bye') @@ -924,7 +945,7 @@ class SpawnPipe(SpawnBase): raise EOF('The pipe is being closed.') else: raise - + def _send_impl(self, s): """This sends a string to the child process. This returns the number of bytes written. If a log file was set then the data is also written to @@ -937,7 +958,7 @@ class SpawnPipe(SpawnBase): win32file.WriteFile(self.pipe, s) logger.spam(f"WriteFile finished.") except pywintypes.error as e: - if e.args[0] == winerror.ERROR_BROKEN_PIPE: #109 + if e.args[0] == winerror.ERROR_BROKEN_PIPE: # 109 logger.info("EOF: broken pipe, bye bye") raise EOF("broken pipe, bye bye") elif e.args[0] == winerror.ERROR_NO_DATA: @@ -947,9 +968,9 @@ class SpawnPipe(SpawnBase): logger.info("The pipe is being closed.") raise EOF("The pipe is being closed.") else: - raise + raise return len(s) - + def kill(self, sig=signal.SIGTERM): """Sig == sigint for ctrl-c otherwise the child is terminated.""" try: @@ -961,20 +982,25 @@ class SpawnPipe(SpawnBase): class SpawnSocket(SpawnBase): - + def __init__(self, command, args=[], timeout=30, maxread=60000, searchwindowsize=None, - logfile=None, cwd=None, env=None, codepage=None, echo=True, port=4321, host='127.0.0.1', interact=False): + logfile=None, cwd=None, env=None, codepage=None, echo=True, port=4321, + host='127.0.0.1', interact=False): self.port = port self.host = host self.sock = None self.console_class_name = 'ConsoleReaderSocket' self.console_class_parameters = {'port': port} - - super().__init__(command=command, args=args, timeout=timeout, maxread=maxread, - searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, interact=interact) - - self.delayafterterminate = 1 # Sets delay in terminate() method to allow kernel time to update process status. Time in seconds. - + + super().__init__( + command=command, args=args, timeout=timeout, maxread=maxread, + searchwindowsize=searchwindowsize, cwd=cwd, env=env, codepage=codepage, echo=echo, + interact=interact) + + # Sets delay in terminate() method to allow kernel time to update process status. Time in + # seconds. + self.delayafterterminate = 1 + def _send_impl(self, s): """This sends a string to the child process. This returns the number of bytes written. If a log file was set then the data is also written to @@ -983,18 +1009,18 @@ class SpawnSocket(SpawnBase): s = str.encode(s) self.sock.sendall(s) return len(s) - + def connect_to_child(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect((self.host, self.port)) self.sock.settimeout(.2) - + def disconnect_from_child(self): if self.sock: self.sock.close() self.sock = None - def read_nonblocking (self, size = 1): + def read_nonblocking(self, size=1): """This reads at most size characters from the child application. If the end of file is read then an EOF exception will be raised. @@ -1007,22 +1033,21 @@ class SpawnSocket(SpawnBase): if self.closed: logger.info('I/O operation on closed file in read_nonblocking().') - raise ValueError ('I/O operation on closed file in read_nonblocking().') - + raise ValueError('I/O operation on closed file in read_nonblocking().') + try: s = self.sock.recv(size) - + if s: logger.debug(f'Readed: {s}') else: logger.spam(f'Readed: {s}') - - + if EOF_CHAR in s: self.flag_eof = True logger.info("EOF: EOF character has been arrived") raise EOF('EOF character has been arrived') - + except ConnectionResetError: self.flag_eof = True logger.info("EOF('ConnectionResetError')") @@ -1031,7 +1056,7 @@ class SpawnSocket(SpawnBase): return '' return s.decode() - + def kill(self, sig=signal.SIGTERM): """Sig == sigint for ctrl-c otherwise the child is terminated.""" try: @@ -1039,7 +1064,7 @@ class SpawnSocket(SpawnBase): self.send(SIGNAL_CHARS[sig]) except EOF as e: logger.info(e) - + class searcher_re (object): """This is regular expression string search helper for the @@ -1080,12 +1105,12 @@ class searcher_re (object): """This returns a human-readable string that represents the state of the object.""" - ss = [ (n,' %d: re.compile("%s")' % (n,str(s.pattern))) for n,s in self._searches] - ss.append((-1,'searcher_re:')) + ss = [(n, ' %d: re.compile("%s")' % (n, str(s.pattern))) for n, s in self._searches] + ss.append((-1, 'searcher_re:')) if self.eof_index >= 0: - ss.append ((self.eof_index,' %d: EOF' % self.eof_index)) + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) if self.timeout_index >= 0: - ss.append ((self.timeout_index,' %d: TIMEOUT' % self.timeout_index)) + ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index)) ss.sort() ss = list(zip(*ss))[1] return '\n'.join(ss) @@ -1096,7 +1121,7 @@ class searcher_re (object): 'buffer' which have not been searched before. See class spawn for the 'searchwindowsize' argument. - + If there is a match this returns the index of that string, and sets 'start', 'end' and 'match'. Otherwise, returns -1.""" @@ -1107,7 +1132,7 @@ class searcher_re (object): if searchwindowsize is None: searchstart = 0 else: - searchstart = max(0, len(buffer)-searchwindowsize) + searchstart = max(0, len(buffer) - searchwindowsize) for index, s in self._searches: match = s.search(buffer, searchstart) if match is None: @@ -1123,8 +1148,8 @@ class searcher_re (object): self.match = the_match self.end = self.match.end() return best_index - - + + class searcher_string (object): """This is a plain string search helper for the spawn.expect_any() method. @@ -1161,12 +1186,12 @@ class searcher_string (object): """This returns a human-readable string that represents the state of the object.""" - ss = [ (ns[0],' %d: "%s"' % ns) for ns in self._strings ] - ss.append((-1,'searcher_string:')) + ss = [(ns[0], ' %d: "%s"' % ns) for ns in self._strings] + ss.append((-1, 'searcher_string:')) if self.eof_index >= 0: - ss.append ((self.eof_index,' %d: EOF' % self.eof_index)) + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) if self.timeout_index >= 0: - ss.append ((self.timeout_index,' %d: TIMEOUT' % self.timeout_index)) + ss.append((self.timeout_index, ' %d: TIMEOUT' % self.timeout_index)) ss.sort() ss = list(zip(*ss))[1] return '\n'.join(ss) @@ -1196,12 +1221,12 @@ class searcher_string (object): # rescanning until we've read three more bytes. # # Sadly, I don't know enough about this interesting topic. /grahn - + for index, s in self._strings: if searchwindowsize is None: # the match, if any, can only be in the fresh data, # or at the very end of the old data - offset = -(freshlen+len(s)) + offset = -(freshlen + len(s)) else: # better obey searchwindowsize offset = -searchwindowsize diff --git a/wexpect/legacy_wexpect.py b/wexpect/legacy_wexpect.py index 34e1a7a..84f5d84 100644 --- a/wexpect/legacy_wexpect.py +++ b/wexpect/legacy_wexpect.py @@ -7,7 +7,7 @@ scripts for duplicating software package installations on different servers. It can be used for automated software testing. Wexpect is in the spirit of Don Libes' Expect, but Wexpect is pure Python. Other Expect-like modules for Python require TCL and Expect or require C extensions to be compiled. Wexpect does not -use C, Expect, or TCL extensions. +use C, Expect, or TCL extensions. There are two main interfaces to Wexpect -- the function, run() and the class, spawn. You can call the run() function to execute a command and return the @@ -80,7 +80,7 @@ import shutil import types import traceback import signal -import pkg_resources +import pkg_resources from io import StringIO try: @@ -96,9 +96,9 @@ try: except ImportError as e: # pragma: no cover raise ImportError(str(e) + "\nThis package requires the win32 python packages.\r\nInstall with pip install pywin32") -# +# # System-wide constants -# +# screenbufferfillchar = '\4' maxconsoleY = 8000 @@ -368,9 +368,9 @@ def spawn(command, args=[], timeout=30, maxread=2000, searchwindowsize=None, log logger.debug('\t%s=%s' % (name, env[name])) if cwd: logger.debug('Working directory: %s' % cwd) - + return spawn_windows(command, args, timeout, maxread, searchwindowsize, logfile, cwd, env, - codepage, echo=echo) + codepage, echo=echo) class spawn_windows (): """This is the main class interface for Wexpect. Use this class to start @@ -381,7 +381,7 @@ class spawn_windows (): """ The spawn_windows constructor. Do not call it directly. Use spawn(), or run() instead. """ self.codepage = codepage - + self.stdin = sys.stdin self.stdout = sys.stdout self.stderr = sys.stderr @@ -414,7 +414,7 @@ class spawn_windows (): self.ocwd = os.getcwd() self.cwd = cwd self.env = env - + # allow dummy instances for subclasses that may not use command or args. if command is None: self.command = None @@ -426,12 +426,12 @@ class spawn_windows (): def __del__(self): """This makes sure that no system resources are left open. Python only garbage collects Python objects, not the child console.""" - + try: self.wtty.terminate_child() except: pass - + def __str__(self): """This returns a human-readable string that represents the state of @@ -463,7 +463,7 @@ class spawn_windows (): s.append('delaybeforesend: ' + str(self.delaybeforesend)) s.append('delayafterterminate: ' + str(self.delayafterterminate)) return '\n'.join(s) - + def _spawn(self,command,args=[], echo=True): """This starts the given command in a child process. This does all the fork/exec type of stuff for a pty. This is called by __init__. If args @@ -487,15 +487,15 @@ class spawn_windows (): if type (args) != type([]): logger.info('TypeError: The argument, args, must be a list.') raise TypeError ('The argument, args, must be a list.') - + if args == []: self.args = split_command_line(command) self.command = self.args[0] else: self.args = args[:] # work with a copy self.args.insert (0, command) - self.command = command - + self.command = command + command_with_path = shutil.which(self.command) if command_with_path is None: logger.info('ExceptionPexpect: The command was not found or was not executable: %s.' % self.command) @@ -508,30 +508,30 @@ class spawn_windows (): #assert self.pid is None, 'The pid member should be None.' #assert self.command is not None, 'The command member should not be None.' - self.wtty = Wtty(codepage=self.codepage, echo=echo) - + self.wtty = Wtty(codepage=self.codepage, echo=echo) + if self.cwd is not None: os.chdir(self.cwd) - + self.child_fd = self.wtty.spawn(self.command, self.args, self.env) - + if self.cwd is not None: # Restore the original working dir os.chdir(self.ocwd) - + self.terminated = False self.closed = False self.pid = self.wtty.pid - + def fileno (self): # File-like object. """There is no child fd.""" - + return 0 def close(self, force=True): # File-like object. """ Closes the child console.""" - + self.closed = self.terminate(force) if not self.closed: logger.info('ExceptionPexpect: close() could not terminate the child using terminate()') @@ -540,7 +540,7 @@ class spawn_windows (): def isatty(self): # File-like object. """The child is always created with a console.""" - + return True def waitnoecho (self, timeout=-1): @@ -564,7 +564,7 @@ class spawn_windows (): if timeout == -1: timeout = self.timeout if timeout is not None: - end_time = time.time() + timeout + end_time = time.time() + timeout while True: if not self.getecho(): return True @@ -583,9 +583,9 @@ class spawn_windows (): def setecho (self, state): """This sets the terminal echo mode on or off.""" - + self.wtty.setecho(state) - + def read (self, size = -1): # File-like object. """This reads at most "size" bytes from the file (less if the read hits @@ -682,7 +682,7 @@ class spawn_windows (): if self.closed: logger.info('ValueError: I/O operation on closed file in read_nonblocking().') raise ValueError ('I/O operation on closed file in read_nonblocking().') - + try: # The real child and it's console are two different process. The console dies 0.1 sec # later to be able to read the child's last output (before EOF). So here we check @@ -698,7 +698,7 @@ class spawn_windows (): except EOF: self.flag_eof = True raise - + if self.logfile is not None: self.logfile.write (s) self.logfile.flush() @@ -753,7 +753,7 @@ class spawn_windows (): """This sends a string to the child process. This returns the number of bytes written. If a log file was set then the data is also written to the log. """ - + (self.delaybeforesend) if self.logfile is not None: self.logfile.write (s) @@ -767,7 +767,7 @@ class spawn_windows (): def sendintr(self): """This sends a SIGINT to the child. It does not require the SIGINT to be the first character on a line. """ - + self.wtty.sendintr() def eof (self): @@ -782,47 +782,47 @@ class spawn_windows (): if not self.isalive(): return True - + self.wtty.terminate_child() time.sleep(self.delayafterterminate) if not self.isalive(): return True - + return False def kill(self, sig=signal.SIGTERM): """Sig == sigint for ctrl-c otherwise the child is terminated.""" - + if not self.isalive(): return - + if sig == signal.SIGINT: self.wtty.sendintr() else: self.wtty.terminate_child() - + def wait(self): """This waits until the child exits. This is a blocking call. This will not read any data from the child, so this will block forever if the child has unread output and has terminated. In other words, the child may have printed output then called exit(); but, technically, the child is still alive until its output is read.""" - - # We can't use os.waitpid under Windows because of 'permission denied' - # exception? Perhaps if not running as admin (or UAC enabled under + + # We can't use os.waitpid under Windows because of 'permission denied' + # exception? Perhaps if not running as admin (or UAC enabled under # Vista/7). Simply loop and wait for child to exit. while self.isalive(): time.sleep(.05) # Keep CPU utilization down - + return self.exitstatus - + def isalive(self): """Determines if the child is still alive.""" - + if self.terminated: logger.debug('self.terminated is true') return False - + if self.wtty.isalive(): return True else: @@ -989,14 +989,14 @@ class spawn_windows (): This method is also useful when you don't want to have to worry about escaping regular expression characters that you want to match.""" - if not isinstance(pattern_list, list): + if not isinstance(pattern_list, list): pattern_list = [pattern_list] - + for p in pattern_list: if type(p) not in (str,) and p not in (TIMEOUT, EOF): logger.info('TypeError: Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p))) raise TypeError ('Argument must be one of StringTypes, EOF, TIMEOUT, or a list of those type. %s' % str(type(p))) - + return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize) def expect_loop(self, searcher, timeout = -1, searchwindowsize = -1): @@ -1012,7 +1012,7 @@ class spawn_windows (): if timeout == -1: timeout = self.timeout if timeout is not None: - end_time = time.time() + timeout + end_time = time.time() + timeout if searchwindowsize == -1: searchwindowsize = self.searchwindowsize @@ -1075,22 +1075,22 @@ class spawn_windows (): def getwinsize(self): """This returns the terminal window size of the child tty. The return value is a tuple of (rows, cols). """ - + return self.wtty.getwinsize() def setwinsize(self, r, c): """Set the size of the child screen buffer. """ - + self.wtty.setwinsize(r, c) - + def interact(self): """Makes the child console visible for interaction""" - + self.wtty.interact() - + def stop_interact(self): """Hides the child console from the user.""" - + self.wtty.stop_interact() ############################################################################## @@ -1122,15 +1122,15 @@ class Wtty: self.timeout = timeout self.totalRead = 0 self.local_echo = echo - + def spawn(self, command, args=[], env=None): """Spawns spawner.py with correct arguments.""" - + ts = time.time() self.startChild(args, env) - + logger.info(f"Fetch child's process and pid...") - + while True: msg = win32gui.GetMessage(0, 0, 0) childPid = msg[1][2] @@ -1150,33 +1150,33 @@ class Wtty: self.pid = childPid break time.sleep(.05) - + logger.info(f"Child's pid: {self.pid}") - + if not self.__childProcess: logger.info('ExceptionPexpect: The process ' + args[0] + ' could not be started.') - raise ExceptionPexpect ('The process ' + args[0] + ' could not be started.') - - - + raise ExceptionPexpect ('The process ' + args[0] + ' could not be started.') + + + winHandle = int(win32console.GetConsoleWindow()) - + self.__switch = True - + if winHandle != 0: - self.__parentPid = win32process.GetWindowThreadProcessId(winHandle)[1] + self.__parentPid = win32process.GetWindowThreadProcessId(winHandle)[1] # Do we have a console attached? Do not rely on winHandle, because - # it will also be non-zero if we didn't have a console, and then + # it will also be non-zero if we didn't have a console, and then # spawned a child process! Using sys.stdout.isatty() seems safe self.console = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() # If the original process had a console, record a list of attached - # processes so we can check if we need to reattach/reallocate the + # processes so we can check if we need to reattach/reallocate the # console later self.processList = win32console.GetConsoleProcessList() else: self.switchTo(False) self.__switch = False - + def startChild(self, args, env): si = win32process.GetStartupInfo() si.dwFlags = win32process.STARTF_USESHOWWINDOW @@ -1184,8 +1184,8 @@ class Wtty: # Determine the directory of wexpect.py or, if we are running 'frozen' # (eg. py2exe deployment), of the packed executable - dirname = os.path.dirname(sys.executable - if getattr(sys, 'frozen', False) else + dirname = os.path.dirname(sys.executable + if getattr(sys, 'frozen', False) else os.path.abspath(__file__)) if getattr(sys, 'frozen', False): logdir = os.path.splitext(sys.executable)[0] @@ -1197,15 +1197,15 @@ class Wtty: if getattr(sys, 'frozen', False): # If we are running 'frozen', add library.zip and lib\library.zip # to sys.path - # py2exe: Needs appropriate 'zipfile' option in setup script and + # py2exe: Needs appropriate 'zipfile' option in setup script and # 'bundle_files' 3 spath.append(os.path.join(dirname, 'library.zip')) - spath.append(os.path.join(dirname, 'library.zip', + spath.append(os.path.join(dirname, 'library.zip', os.path.basename(os.path.splitext(sys.executable)[0]))) if os.path.isdir(os.path.join(dirname, 'lib')): dirname = os.path.join(dirname, 'lib') spath.append(os.path.join(dirname, 'library.zip')) - spath.append(os.path.join(dirname, 'library.zip', + spath.append(os.path.join(dirname, 'library.zip', os.path.basename(os.path.splitext(sys.executable)[0]))) pyargs.insert(0, '-S') # skip 'import site' pid = win32process.GetCurrentProcessId() @@ -1213,37 +1213,37 @@ class Wtty: cp = self.codepage or windll.kernel32.GetACP() # If we are running 'frozen', expect python.exe in the same directory # as the packed executable. - # py2exe: The python executable can be included via setup script by + # py2exe: The python executable can be included via setup script by # adding it to 'data_files' - + if getattr(sys, 'frozen', False): - python_executable = os.path.join(dirname, 'python.exe') + python_executable = os.path.join(dirname, 'python.exe') else: python_executable = os.path.join(os.path.dirname(sys.executable), 'python.exe') - - commandLine = '"%s" %s "%s"' % (python_executable, - ' '.join(pyargs), + + commandLine = '"%s" %s "%s"' % (python_executable, + ' '.join(pyargs), f"import sys; sys.path = {spath} + sys.path;" f"args = {args}; import wexpect;" f"wexpect.ConsoleReader(wexpect.join_args(args), {pid}, {tid}, cp={cp}, logdir={logdir})" ) - + logger.info(f'CreateProcess: {commandLine}') - - self.__oproc, x, self.conpid, self.__otid = win32process.CreateProcess(None, commandLine, None, None, False, + + self.__oproc, x, self.conpid, self.__otid = win32process.CreateProcess(None, commandLine, None, None, False, win32process.CREATE_NEW_CONSOLE, env, None, si) logger.info(f'self.__oproc: {self.__oproc}') logger.info(f'x: {x}') logger.info(f'self.conpid: {self.conpid}') logger.info(f'self.__otid: {self.__otid}') - + def switchTo(self, attatched=True): """Releases from the current console and attatches to the childs.""" - + if not self.__switch: return - + try: # No 'attached' check is needed, FreeConsole() can be called multiple times. win32console.FreeConsole() @@ -1254,11 +1254,11 @@ class Wtty: # When child has finished... logger.info('EOF: End Of File (EOF) in switchTo().') raise EOF('End Of File (EOF) in switchTo().') - + win32console.AttachConsole(self.conpid) self.__consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE) self.__consout = self.getConsoleOut() - + except pywintypes.error as e: # pywintypes.error: (5, 'AttachConsole', 'Access is denied.') # When child has finished... @@ -1274,15 +1274,15 @@ class Wtty: self.switchBack() logger.info(traceback.format_exc()) raise - - + + def switchBack(self): - """Releases from the current console and attaches + """Releases from the current console and attaches to the parents.""" if not self.__switch: return - + if self.console: # If we originally had a console, re-attach it (or allocate a new one) # If we didn't have a console to begin with, there's no need to @@ -1294,44 +1294,44 @@ class Wtty: else: # Our original console has been free'd, allocate a new one win32console.AllocConsole() - + self.__consin = None self.__consout = None - + def getConsoleOut(self): - consout = win32file.CreateFile('CONOUT$', - win32con.GENERIC_READ | win32con.GENERIC_WRITE, - win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, - None, - win32con.OPEN_EXISTING, - 0, + consout = win32file.CreateFile('CONOUT$', + win32con.GENERIC_READ | win32con.GENERIC_WRITE, + win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, + None, + win32con.OPEN_EXISTING, + 0, 0) - - return win32console.PyConsoleScreenBufferType(consout) - + + return win32console.PyConsoleScreenBufferType(consout) + def getchild(self): """Returns a handle to the child process.""" - + return self.__childProcess - + def terminate_child(self): """Terminate the child process.""" win32api.TerminateProcess(self.__childProcess, 1) # win32api.win32process.TerminateProcess(self.__childProcess, 1) - + def createKeyEvent(self, char): """Creates a single key record corrosponding to the ascii character char.""" - + evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT) evt.KeyDown = True evt.Char = char evt.RepeatCount = 1 - return evt - + return evt + def write(self, s): """Writes input into the child consoles input buffer.""" - + if len(s) == 0: return 0 self.switchTo() @@ -1341,41 +1341,41 @@ class Wtty: records = [self.createKeyEvent(c) for c in str(s)] if not self.__consout: return "" - + # Store the current cursor position to hide characters in local echo disabled mode (workaround). consinfo = self.__consout.GetConsoleScreenBufferInfo() startCo = consinfo['CursorPosition'] - + # Send the string to console input wrote = self.__consin.WriteConsoleInput(records) - + # Wait until all input has been recorded by the console. ts = time.time() while self.__consin.PeekConsoleInput(8) != (): if time.time() > ts + len(s) * .1 + .5: break time.sleep(.05) - + # Hide characters in local echo disabled mode (workaround). if not self.local_echo: self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo) - + return wrote finally: self.switchBack() - + def getCoord(self, offset): """Converts an offset to a point represented as a tuple.""" - + x = offset % self.__consSize[0] y = offset // self.__consSize[0] return win32console.PyCOORDType(x, y) - + def getOffset(self, coord): """Converts a tuple-point to an offset.""" - + return coord.X + coord.Y * self.__consSize[0] - + def readConsole(self, startCo, endCo): """Reads the console area from startCo to endCo and returns it as a string.""" @@ -1387,10 +1387,10 @@ class Wtty: startOff = self.getOffset(startCo) endOff = self.getOffset(endCo) readlen = endOff - startOff - + if readlen <= 0: break - + if readlen > 4000: readlen = 4000 endPoint = self.getCoord(startOff + readlen) @@ -1403,12 +1403,12 @@ class Wtty: startCo = endPoint return ''.join(buff) - + def parseData(self, s): """Ensures that special characters are interpretted as newlines or blanks, depending on if there written over characters or screen-buffer-fill characters.""" - + strlist = [] for i, c in enumerate(s): if c == screenbufferfillchar: @@ -1419,35 +1419,35 @@ class Wtty: s = ''.join(strlist) return s - - + + def readConsoleToCursor(self): """Reads from the current read position to the current cursor position and inserts the string into self.__buffer.""" - + if not self.__consout: return "" - + consinfo = self.__consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] - + logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) isSameX = cursorPos.X == self.__currentReadCo.X isSameY = cursorPos.Y == self.__currentReadCo.Y isSamePos = isSameX and isSameY - + logger.debug('isSameY: %r' % isSameY) logger.debug('isSamePos: %r' % isSamePos) - + if isSameY or not self.lastReadData.endswith('\r\n'): # Read the current slice again self.totalRead -= self.lastRead self.__currentReadCo.X = 0 self.__currentReadCo.Y = self.__bufferY - + logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) - + raw = self.readConsole(self.__currentReadCo, cursorPos) rawlist = [] while raw: @@ -1461,16 +1461,16 @@ class Wtty: # Record the Y offset where the most recent line break was detected self.__bufferY += len(rawlist) - i break - + logger.debug('lastReadData: %r' % self.lastReadData) logger.debug('s: %r' % s) - + if isSamePos and self.lastReadData == s: logger.debug('isSamePos and self.lastReadData == s') s = '' - + logger.debug('s: %r' % s) - + if s: lastReadData = self.lastReadData pos = self.getOffset(self.__currentReadCo) @@ -1491,7 +1491,7 @@ class Wtty: self.lastRead = lastRead else: # Cursor has been repositioned - s = '\r' + s + s = '\r' + s logger.debug('s: %r' % s) self.__buffer.seek(pos) self.__buffer.truncate() @@ -1501,36 +1501,36 @@ class Wtty: self.__currentReadCo.Y = cursorPos.Y return s - - + + def read_nonblocking(self, size): """Reads data from the console if available, otherwise - returns empty string""" - + returns empty string""" + try: self.switchTo() time.sleep(.01) - + if self.__currentReadCo.Y > maxconsoleY: time.sleep(.2) - + s = self.readConsoleToCursor() - + if self.__currentReadCo.Y > maxconsoleY: self.refreshConsole() - + return s - + finally: self.switchBack() - + raise Exception('Unreachable code...') # pragma: no cover - - + + def refreshConsole(self): """Clears the console after pausing the child and reading all the data currently on the console.""" - + orig = win32console.PyCOORDType(0, 0) self.__consout.SetConsoleCursorPosition(orig) self.__currentReadCo.X = 0 @@ -1539,81 +1539,81 @@ class Wtty: # Use NUL as fill char because it displays as whitespace # (if we interact() with the child) self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig) - + self.__bufferY = 0 self.__buffer.truncate(0) - + consinfo = self.__consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] logger.debug('refreshConsole: cursorPos %s' % cursorPos) - - + + def setecho(self, state): """Sets the echo mode of the child console. This is a workaround of the setecho. The original GetConsoleMode() / SetConsoleMode() methods didn't work. See git history for the concrete implementation. 2020.01.09 raczben """ - + self.local_echo = state - + def getecho(self): """Returns the echo mode of the child console. This is a workaround of the getecho. The original GetConsoleMode() / SetConsoleMode() methods didn't work. See git history for the concrete implementation. 2020.01.09 raczben """ - + return self.local_echo - + def getwinsize(self): """Returns the size of the child console as a tuple of (rows, columns).""" - + self.switchTo() try: size = self.__consout.GetConsoleScreenBufferInfo()['Size'] finally: self.switchBack() return (size.Y, size.X) - + def setwinsize(self, r, c): """Sets the child console screen buffer size to (r, c).""" - + self.switchTo() try: self.__consout.SetConsoleScreenBufferSize(win32console.PyCOORDType(c, r)) finally: self.switchBack() - + def interact(self): """Displays the child console for interaction.""" - + self.switchTo() try: win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW) finally: self.switchBack() - + def stop_interact(self): """Hides the child console.""" - + self.switchTo() try: win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_HIDE) finally: self.switchBack() - + def isalive(self, console=False): """True if the child is still alive, false otherwise""" - + if console: return win32process.GetExitCodeProcess(self.__conProcess) == win32con.STILL_ACTIVE else: return win32process.GetExitCodeProcess(self.__childProcess) == win32con.STILL_ACTIVE - + class ConsoleReader: # pragma: no cover - + def __init__(self, path, pid, tid, env = None, cp=None, logdir=None): self.logdir = logdir logger.info('consolepid: {}'.format(os.getpid())) @@ -1633,9 +1633,9 @@ class ConsoleReader: # pragma: no cover try: consout = self.getConsoleOut() self.initConsole(consout) - + si = win32process.GetStartupInfo() - self.__childProcess, _, childPid, self.__tid = win32process.CreateProcess(None, path, None, None, False, + self.__childProcess, _, childPid, self.__tid = win32process.CreateProcess(None, path, None, None, False, 0, None, None, si) logger.info('childPid: {} host_pid: {}'.format(childPid, pid)) except Exception: @@ -1643,62 +1643,62 @@ class ConsoleReader: # pragma: no cover time.sleep(.1) win32api.PostThreadMessage(int(tid), win32con.WM_USER, 0, 0) sys.exit() - + time.sleep(.1) - + win32api.PostThreadMessage(int(tid), win32con.WM_USER, childPid, 0) - + parent = win32api.OpenProcess(win32con.PROCESS_TERMINATE | win32con.PROCESS_QUERY_INFORMATION , 0, int(pid)) paused = False - + while True: consinfo = consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] - + if win32process.GetExitCodeProcess(parent) != win32con.STILL_ACTIVE or win32process.GetExitCodeProcess(self.__childProcess) != win32con.STILL_ACTIVE: time.sleep(.1) try: win32process.TerminateProcess(self.__childProcess, 0) except pywintypes.error as e: - # 'Access denied' happens always? Perhaps if not - # running as admin (or UAC enabled under Vista/7). - # Don't log. Child process will exit regardless when + # 'Access denied' happens always? Perhaps if not + # running as admin (or UAC enabled under Vista/7). + # Don't log. Child process will exit regardless when # calling sys.exit if e.args[0] != winerror.ERROR_ACCESS_DENIED: logger.info(e) logger.info('Exiting...') sys.exit() - + if cursorPos.Y > maxconsoleY and not paused: self.suspendThread() paused = True - + if cursorPos.Y <= maxconsoleY and paused: self.resumeThread() paused = False - + time.sleep(.1) except Exception as e: logger.info(e) time.sleep(.1) - - - def handler(self, sig): + + + def handler(self, sig): logger.info(sig) return False def getConsoleOut(self): - consout = win32file.CreateFile('CONOUT$', - win32con.GENERIC_READ | win32con.GENERIC_WRITE, - win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, - None, - win32con.OPEN_EXISTING, - 0, + consout = win32file.CreateFile('CONOUT$', + win32con.GENERIC_READ | win32con.GENERIC_WRITE, + win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, + None, + win32con.OPEN_EXISTING, + 0, 0) - + return win32console.PyConsoleScreenBufferType(consout) - - def initConsole(self, consout): + + def initConsole(self, consout): rect = win32console.PySMALL_RECTType(0, 0, 79, 24) consout.SetConsoleWindowInfo(True, rect) size = win32console.PyCOORDType(80, 16000) @@ -1706,20 +1706,20 @@ class ConsoleReader: # pragma: no cover pos = win32console.PyCOORDType(0, 0) # Use NUL as fill char because it displays as whitespace # (if we interact() with the child) - consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos) - + consout.FillConsoleOutputCharacter(screenbufferfillchar, size.X * size.Y, pos) + def suspendThread(self): """Pauses the main thread of the child process.""" - + handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.__tid) win32process.SuspendThread(handle) - + def resumeThread(self): """Un-pauses the main thread of the child process.""" - + handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.__tid) win32process.ResumeThread(handle) - + class searcher_string (object): """This is a plain string search helper for the spawn.expect_any() method. @@ -1795,7 +1795,7 @@ class searcher_string (object): # rescanning until we've read three more bytes. # # Sadly, I don't know enough about this interesting topic. /grahn - + for index, s in self._strings: if searchwindowsize is None: # the match, if any, can only be in the fresh data, @@ -1874,7 +1874,7 @@ class searcher_re (object): 'buffer' which have not been searched before. See class spawn for the 'searchwindowsize' argument. - + If there is a match this returns the index of that string, and sets 'start', 'end' and 'match'. Otherwise, returns -1.""" diff --git a/wexpect/wexpect_util.py b/wexpect/wexpect_util.py index d714e0f..6fb645c 100644 --- a/wexpect/wexpect_util.py +++ b/wexpect/wexpect_util.py @@ -1,7 +1,7 @@ """Wexpect is a Windows variant of pexpect https://pexpect.readthedocs.io. Wexpect is a Python module for spawning child applications and controlling -them automatically. +them automatically. wexpect util contains small functions, and classes, which are used in multiple classes. The command line argument parsers, and the Exceptions placed here. @@ -48,18 +48,23 @@ import signal EOF_CHAR = b'\x04' SIGNAL_CHARS = { - signal.SIGTERM: b'\x011', # Device control 1 - signal.SIGINT: b'\x012', # Device control 2 - } + signal.SIGTERM: b'\x011', # Device control 1 + signal.SIGINT: b'\x012', # Device control 2 +} SPAM = 5 logging.addLevelName(SPAM, "SPAM") + + def spam(self, message, *args, **kws): if self.isEnabledFor(SPAM): # Yes, logger takes its '*args' as 'args'. self._log(SPAM, message, args, **kws) + + logging.Logger.spam = spam + def init_logger(logger): try: logger_level = os.environ['WEXPECT_LOGGER_LEVEL'] @@ -72,13 +77,15 @@ def init_logger(logger): logger_filename = f'{logger_filename}.log' os.makedirs(os.path.dirname(logger_filename), exist_ok=True) fh = logging.FileHandler(logger_filename, 'w', 'utf-8') - formatter = logging.Formatter('%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s') + formatter = logging.Formatter( + '%(asctime)s - %(filename)s:%(lineno)d - %(levelname)s - %(message)s') fh.setFormatter(formatter) logger.addHandler(fh) except KeyError: logger.setLevel(logging.ERROR) -def split_command_line(command_line, escape_char = '^'): + +def split_command_line(command_line, escape_char='^'): """This splits a command line into a list of arguments. It splits arguments on spaces, but handles embedded quotes, doublequotes, and escaped characters. It's impossible to do this with a regular expression, so I @@ -92,21 +99,21 @@ def split_command_line(command_line, escape_char = '^'): state_esc = 1 state_singlequote = 2 state_doublequote = 3 - state_whitespace = 4 # The state of consuming whitespace between commands. + state_whitespace = 4 # The state of consuming whitespace between commands. state = state_basic for c in command_line: if state == state_basic or state == state_whitespace: - if c == escape_char: # Escape the next character + if c == escape_char: # Escape the next character state = state_esc - elif c == r"'": # Handle single quote + elif c == r"'": # Handle single quote state = state_singlequote - elif c == r'"': # Handle double quote + elif c == r'"': # Handle double quote state = state_doublequote elif c.isspace(): # Add arg to arg_list if we aren't in the middle of whitespace. if state == state_whitespace: - None # Do nothing. + None # Do nothing. else: arg_list.append(arg) arg = '' @@ -132,8 +139,9 @@ def split_command_line(command_line, escape_char = '^'): arg_list.append(arg) return arg_list + def join_args(args): - """Joins arguments into a command line. It quotes all arguments that contain + """Joins arguments a command line. It quotes all arguments that contain spaces or any of the characters ^!$%&()[]{}=;'+,`~""" commandline = [] for arg in args: @@ -178,6 +186,6 @@ class EOF(ExceptionPexpect): """Raised when EOF is read from a child. This usually means the child has exited. The user can wait to EOF, which means he waits the end of the execution of the child process.""" + class TIMEOUT(ExceptionPexpect): """Raised when a read time exceeds the timeout. """ -