mirror of
https://github.com/clearml/wexpect-venv
synced 2025-04-25 08:44:09 +00:00
[FIX] Infinite length console: #35
This commit is contained in:
parent
a3acabda73
commit
524cf0e7c3
32
tests/parametric_printer.py
Normal file
32
tests/parametric_printer.py
Normal file
@ -0,0 +1,32 @@
|
||||
'''
|
||||
This is is a very basic stdio handler script. This is used by python.py example.
|
||||
'''
|
||||
|
||||
import time
|
||||
import sys
|
||||
|
||||
# Read an integer from the user:
|
||||
|
||||
while True:
|
||||
print('Format:character,character_count,line_count,speed_ms> ', end='')
|
||||
command = input()
|
||||
(character,character_count,line_count,speed_ms) = command.split(',')
|
||||
character_count = int(character_count)
|
||||
speed_ms = int(speed_ms)
|
||||
line_count = int(line_count)
|
||||
|
||||
if line_count<1:
|
||||
sys.exit(0)
|
||||
for _ in range(line_count):
|
||||
if speed_ms<0:
|
||||
print(character*character_count)
|
||||
sys.stdout.flush()
|
||||
else:
|
||||
for i in range(character_count):
|
||||
if i == character_count-1:
|
||||
print(character)
|
||||
sys.stdout.flush()
|
||||
else:
|
||||
print(character, end='')
|
||||
sys.stdout.flush()
|
||||
time.sleep(speed_ms/1000)
|
86
tests/test_parametric_printer.py
Normal file
86
tests/test_parametric_printer.py
Normal file
@ -0,0 +1,86 @@
|
||||
import wexpect
|
||||
import unittest
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
from tests import PexpectTestCase
|
||||
|
||||
class TestCaseParametricPrinter(PexpectTestCase.PexpectTestCase):
|
||||
def test_all_line_length (self):
|
||||
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, here)
|
||||
|
||||
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
|
||||
python_executable = '"' + sys.executable + '" '
|
||||
child_script = here + '\\parametric_printer.py'
|
||||
|
||||
self.prompt = '> '
|
||||
|
||||
# Start the child process
|
||||
self.p = wexpect.spawn(python_executable + ' ' + child_script)
|
||||
# Wait for prompt
|
||||
self.p.expect(self.prompt)
|
||||
|
||||
self._test(['a'], range(1,200), [1], [0])
|
||||
|
||||
|
||||
def test_random(self):
|
||||
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, here)
|
||||
|
||||
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
|
||||
python_executable = '"' + sys.executable + '" '
|
||||
child_script = here + '\\parametric_printer.py'
|
||||
|
||||
self.prompt = '> '
|
||||
|
||||
# Start the child process
|
||||
self.p = wexpect.spawn(python_executable + ' ' + child_script)
|
||||
# Wait for prompt
|
||||
self.p.expect(self.prompt)
|
||||
|
||||
self._test(['a', 'b', 'c'], [1, 2, 4, 8], [1, 2, 4, 8], [-1, 0, 1, 2])
|
||||
self._test(['a', 'b', 'c'], [16], [16], [-1, 0, 1])
|
||||
self._test(['a', 'b', 'c'], [16, 32, 64], [16, 32, 64], [-1, 0])
|
||||
|
||||
def test_long_console(self):
|
||||
|
||||
here = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.insert(0, here)
|
||||
|
||||
# With quotes (C:\Program Files\Python37\python.exe needs quotes)
|
||||
python_executable = '"' + sys.executable + '" '
|
||||
child_script = here + '\\parametric_printer.py'
|
||||
|
||||
self.prompt = '> '
|
||||
|
||||
# Start the child process
|
||||
self.p = wexpect.spawn(python_executable + ' ' + child_script)
|
||||
# Wait for prompt
|
||||
self.p.expect(self.prompt)
|
||||
|
||||
self._test(['a', 'b', 'c', 'd', 'e', 'f'], [8, 16, 32, 64], [64, 128, 256], [-1, 0])
|
||||
|
||||
def _test(self, character_list, character_count_list, line_count_list, speed_ms_list):
|
||||
|
||||
# print(f'character_list: {character_list} character_count_list: {character_count_list} line_count_list: {line_count_list} speed_ms_list: {speed_ms_list}')
|
||||
for character in character_list:
|
||||
for character_count in character_count_list:
|
||||
for line_count in line_count_list:
|
||||
for speed_ms in speed_ms_list:
|
||||
command = f'{character},{character_count},{line_count},{speed_ms}'
|
||||
self.p.sendline(command)
|
||||
self.p.expect(self.prompt)
|
||||
expected = [character*character_count] * line_count
|
||||
try:
|
||||
self.assertEqual(self.p.before.splitlines()[1:-1], expected)
|
||||
except:
|
||||
time.sleep(5)
|
||||
raise
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
suite = unittest.makeSuite(TestCaseParametricPrinter,'test')
|
@ -96,7 +96,7 @@ class ConsoleReaderBase:
|
||||
try:
|
||||
self.initConsole()
|
||||
si = win32process.GetStartupInfo()
|
||||
self.__childProcess, _, self.child_pid, self.__tid = win32process.CreateProcess(
|
||||
self.__childProcess, _, self.child_pid, self.child_tid = win32process.CreateProcess(
|
||||
None, path, None, None, False, 0, None, None, si)
|
||||
self.child_process = psutil.Process(self.child_pid)
|
||||
|
||||
@ -129,7 +129,6 @@ class ConsoleReaderBase:
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
def read_loop(self):
|
||||
paused = False
|
||||
|
||||
while True:
|
||||
if not self.isalive(self.host_process):
|
||||
@ -143,7 +142,20 @@ class ConsoleReaderBase:
|
||||
|
||||
consinfo = self.consout.GetConsoleScreenBufferInfo()
|
||||
cursorPos = consinfo['CursorPosition']
|
||||
self.send_to_host(self.readConsoleToCursor())
|
||||
|
||||
if cursorPos.Y > maxconsoleY:
|
||||
'''If the console output becomes long, we suspend the child, read all output then
|
||||
clear the console before we resume the child.
|
||||
'''
|
||||
logger.info('cursorPos %s' % cursorPos)
|
||||
self.suspend_child()
|
||||
time.sleep(.2)
|
||||
self.send_to_host(self.readConsoleToCursor())
|
||||
self.refresh_console()
|
||||
self.resume_child()
|
||||
else:
|
||||
self.send_to_host(self.readConsoleToCursor())
|
||||
|
||||
s = self.get_from_host()
|
||||
if s:
|
||||
logger.debug(f'get_from_host: {s}')
|
||||
@ -156,18 +168,35 @@ class ConsoleReaderBase:
|
||||
s = s.decode()
|
||||
self.write(s)
|
||||
|
||||
if cursorPos.Y > maxconsoleY and not paused:
|
||||
logger.info('cursorPos %s' % cursorPos)
|
||||
self.suspendThread()
|
||||
paused = True
|
||||
|
||||
if cursorPos.Y <= maxconsoleY and paused:
|
||||
logger.info('cursorPos %s' % cursorPos)
|
||||
self.resumeThread()
|
||||
paused = False
|
||||
|
||||
time.sleep(.02)
|
||||
|
||||
def suspend_child(self):
|
||||
"""Pauses the main thread of the child process."""
|
||||
handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.child_tid)
|
||||
win32process.SuspendThread(handle)
|
||||
|
||||
def resume_child(self):
|
||||
"""Un-pauses the main thread of the child process."""
|
||||
handle = windll.kernel32.OpenThread(win32con.THREAD_SUSPEND_RESUME, 0, self.child_tid)
|
||||
win32process.ResumeThread(handle)
|
||||
|
||||
def refresh_console(self):
|
||||
"""Clears the console after pausing the child and
|
||||
reading all the data currently on the console."""
|
||||
|
||||
orig = win32console.PyCOORDType(0, 0)
|
||||
self.consout.SetConsoleCursorPosition(orig)
|
||||
self.__currentReadCo.X = 0
|
||||
self.__currentReadCo.Y = 0
|
||||
writelen = self.__consSize.X * self.__consSize.Y
|
||||
# Use NUL as fill char because it displays as whitespace
|
||||
# (if we interact() with the child)
|
||||
self.consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig)
|
||||
|
||||
self.__bufferY = 0
|
||||
self.__buffer.truncate(0)
|
||||
|
||||
def terminate_child(self):
|
||||
try:
|
||||
if self.child_process:
|
||||
|
Loading…
Reference in New Issue
Block a user