diff --git a/tests/parametric_printer.py b/tests/parametric_printer.py new file mode 100644 index 0000000..f99b099 --- /dev/null +++ b/tests/parametric_printer.py @@ -0,0 +1,32 @@ +''' +This is is a very basic stdio handler script. This is used by python.py example. +''' + +import time +import sys + +# Read an integer from the user: + +while True: + print('Format:character,character_count,line_count,speed_ms> ', end='') + command = input() + (character,character_count,line_count,speed_ms) = command.split(',') + character_count = int(character_count) + speed_ms = int(speed_ms) + line_count = int(line_count) + + if line_count<1: + sys.exit(0) + for _ in range(line_count): + if speed_ms<0: + print(character*character_count) + sys.stdout.flush() + else: + for i in range(character_count): + if i == character_count-1: + print(character) + sys.stdout.flush() + else: + print(character, end='') + sys.stdout.flush() + time.sleep(speed_ms/1000) diff --git a/tests/test_parametric_printer.py b/tests/test_parametric_printer.py new file mode 100644 index 0000000..25cd5b1 --- /dev/null +++ b/tests/test_parametric_printer.py @@ -0,0 +1,86 @@ +import wexpect +import unittest +import sys +import os +import time +from tests import PexpectTestCase + +class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase): + def test_all_line_length (self): + + here = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, here) + + # With quotes (C:\Program Files\Python37\python.exe needs quotes) + python_executable = '"' + sys.executable + '" ' + child_script = here + '\\parametric_printer.py' + + self.prompt = '> ' + + # Start the child process + self.p = wexpect.spawn(python_executable + ' ' + child_script) + # Wait for prompt + self.p.expect(self.prompt) + + self._test(['a'], range(1,200), [1], [0]) + + + def test_random(self): + + here = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, here) + + # With quotes (C:\Program Files\Python37\python.exe needs quotes) + python_executable = '"' + sys.executable + '" ' + child_script = here + '\\parametric_printer.py' + + self.prompt = '> ' + + # Start the child process + self.p = wexpect.spawn(python_executable + ' ' + child_script) + # Wait for prompt + self.p.expect(self.prompt) + + self._test(['a', 'b', 'c'], [1, 2, 4, 8], [1, 2, 4, 8], [-1, 0, 1, 2]) + self._test(['a', 'b', 'c'], [16], [16], [-1, 0, 1]) + self._test(['a', 'b', 'c'], [16, 32, 64], [16, 32, 64], [-1, 0]) + + def test_long_console(self): + + here = os.path.dirname(os.path.abspath(__file__)) + sys.path.insert(0, here) + + # With quotes (C:\Program Files\Python37\python.exe needs quotes) + python_executable = '"' + sys.executable + '" ' + child_script = here + '\\parametric_printer.py' + + self.prompt = '> ' + + # Start the child process + self.p = wexpect.spawn(python_executable + ' ' + child_script) + # Wait for prompt + self.p.expect(self.prompt) + + self._test(['a', 'b', 'c', 'd', 'e', 'f'], [8, 16, 32, 64], [64, 128, 256], [-1, 0]) + + def _test(self, character_list, character_count_list, line_count_list, speed_ms_list): + + # print(f'character_list: {character_list} character_count_list: {character_count_list} line_count_list: {line_count_list} speed_ms_list: {speed_ms_list}') + for character in character_list: + for character_count in character_count_list: + for line_count in line_count_list: + for speed_ms in speed_ms_list: + command = f'{character},{character_count},{line_count},{speed_ms}' + self.p.sendline(command) + self.p.expect(self.prompt) + expected = [character*character_count] * line_count + try: + self.assertEqual(self.p.before.splitlines()[1:-1], expected) + except: + time.sleep(5) + raise + +if __name__ == '__main__': + unittest.main() + +suite = unittest.makeSuite(TestCaseParametricPrinter,'test') diff --git a/wexpect/console_reader.py b/wexpect/console_reader.py index 4dd532d..ffde67d 100644 --- a/wexpect/console_reader.py +++ b/wexpect/console_reader.py @@ -96,7 +96,7 @@ class ConsoleReaderBase: try: self.initConsole() si = win32process.GetStartupInfo() - self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess( + self.__childProcess, _, self.child_pid, self.child_tid = win32process.CreateProcess( None, path, None, None, False, 0, None, None, si) self.child_process = psutil.Process(self.child_pid) @@ -129,7 +129,6 @@ class ConsoleReaderBase: logger.error(traceback.format_exc()) def read_loop(self): - paused = False while True: if not self.isalive(self.host_process): @@ -143,7 +142,20 @@ class ConsoleReaderBase: consinfo = self.consout.GetConsoleScreenBufferInfo() cursorPos = consinfo['CursorPosition'] - self.send_to_host(self.readConsoleToCursor()) + + if cursorPos.Y > maxconsoleY: + '''If the console output becomes long, we suspend the child, read all output then + clear the console before we resume the child. + ''' + logger.info('cursorPos %s' % cursorPos) + self.suspend_child() + time.sleep(.2) + self.send_to_host(self.readConsoleToCursor()) + self.refresh_console() + self.resume_child() + else: + self.send_to_host(self.readConsoleToCursor()) + s = self.get_from_host() if s: logger.debug(f'get_from_host: {s}') @@ -156,18 +168,35 @@ class ConsoleReaderBase: s = s.decode() self.write(s) - if cursorPos.Y > maxconsoleY and not paused: - logger.info('cursorPos %s' % cursorPos) - self.suspendThread() - paused = True - - if cursorPos.Y <= maxconsoleY and paused: - logger.info('cursorPos %s' % cursorPos) - self.resumeThread() - paused = False time.sleep(.02) + def suspend_child(self): + """Pauses the main thread of the child process.""" + handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.child_tid) + win32process.SuspendThread(handle) + + def resume_child(self): + """Un-pauses the main thread of the child process.""" + handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.child_tid) + win32process.ResumeThread(handle) + + def refresh_console(self): + """Clears the console after pausing the child and + reading all the data currently on the console.""" + + orig = win32console.PyCOORDType(0, 0) + self.consout.SetConsoleCursorPosition(orig) + self.__currentReadCo.X = 0 + self.__currentReadCo.Y = 0 + writelen = self.__consSize.X * self.__consSize.Y + # Use NUL as fill char because it displays as whitespace + # (if we interact() with the child) + self.consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig) + + self.__bufferY = 0 + self.__buffer.truncate(0) + def terminate_child(self): try: if self.child_process: