mirror of
https://github.com/clearml/wexpect-venv
synced 2025-06-26 18:15:52 +00:00
minimal 17
This commit is contained in:
parent
58bc45665d
commit
40f6d8f8e7
@ -668,381 +668,11 @@ class Wtty:
|
||||
logger.info(f'self.conpid: {self.conpid}')
|
||||
logger.info(f'self.__otid: {self.__otid}')
|
||||
|
||||
def switchTo(self, attatched=True):
|
||||
"""Releases from the current console and attatches
|
||||
to the childs."""
|
||||
|
||||
if not self.__switch:
|
||||
return
|
||||
|
||||
try:
|
||||
# No 'attached' check is needed, FreeConsole() can be called multiple times.
|
||||
win32console.FreeConsole()
|
||||
# This is the workaround for #14. The #14 will still occure if the child process
|
||||
# finishes between this `isalive()` check and `AttachConsole(self.conpid)`. (However the
|
||||
# risk is low.)
|
||||
if not self.isalive(console=True):
|
||||
# When child has finished...
|
||||
logger.info('EOF: End Of File (EOF) in switchTo().')
|
||||
raise EOF('End Of File (EOF) in switchTo().')
|
||||
|
||||
win32console.AttachConsole(self.conpid)
|
||||
self.__consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE)
|
||||
self.__consout = self.getConsoleOut()
|
||||
|
||||
except pywintypes.error as e:
|
||||
# pywintypes.error: (5, 'AttachConsole', 'Access is denied.')
|
||||
# When child has finished...
|
||||
logging.info(e)
|
||||
# In case of any error: We "switch back" (attach) our original console, then raise the
|
||||
# error.
|
||||
self.switchBack()
|
||||
logger.info('EOF: End Of File (EOF) in switchTo().')
|
||||
raise EOF('End Of File (EOF) in switchTo().')
|
||||
except:
|
||||
# In case of any error: We "switch back" (attach) our original console, then raise the
|
||||
# error.
|
||||
self.switchBack()
|
||||
logger.info(traceback.format_exc())
|
||||
raise
|
||||
|
||||
|
||||
def switchBack(self):
|
||||
"""Releases from the current console and attaches
|
||||
to the parents."""
|
||||
|
||||
if not self.__switch:
|
||||
return
|
||||
|
||||
if self.console:
|
||||
# If we originally had a console, re-attach it (or allocate a new one)
|
||||
# If we didn't have a console to begin with, there's no need to
|
||||
# re-attach/allocate
|
||||
win32console.FreeConsole()
|
||||
if len(self.processList) > 1:
|
||||
# Our original console is still present, re-attach
|
||||
win32console.AttachConsole(self.__parentPid)
|
||||
else:
|
||||
# Our original console has been free'd, allocate a new one
|
||||
win32console.AllocConsole()
|
||||
|
||||
self.__consin = None
|
||||
self.__consout = None
|
||||
|
||||
def getConsoleOut(self):
|
||||
consout = win32file.CreateFile('CONOUT$',
|
||||
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
|
||||
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
|
||||
None,
|
||||
win32con.OPEN_EXISTING,
|
||||
0,
|
||||
0)
|
||||
|
||||
return win32console.PyConsoleScreenBufferType(consout)
|
||||
|
||||
def getchild(self):
|
||||
"""Returns a handle to the child process."""
|
||||
|
||||
return self.__childProcess
|
||||
|
||||
def terminate_child(self):
|
||||
"""Terminate the child process."""
|
||||
win32api.TerminateProcess(self.__childProcess, 1)
|
||||
# win32api.win32process.TerminateProcess(self.__childProcess, 1)
|
||||
|
||||
def createKeyEvent(self, char):
|
||||
"""Creates a single key record corrosponding to
|
||||
the ascii character char."""
|
||||
|
||||
evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT)
|
||||
evt.KeyDown = True
|
||||
evt.Char = char
|
||||
evt.RepeatCount = 1
|
||||
return evt
|
||||
|
||||
def write(self, s):
|
||||
"""Writes input into the child consoles input buffer."""
|
||||
|
||||
if len(s) == 0:
|
||||
return 0
|
||||
self.switchTo()
|
||||
try:
|
||||
if s[-1] == '\n':
|
||||
s = s[:-1]
|
||||
records = [self.createKeyEvent(c) for c in str(s)]
|
||||
if not self.__consout:
|
||||
return ""
|
||||
|
||||
# Store the current cursor position to hide characters in local echo disabled mode (workaround).
|
||||
consinfo = self.__consout.GetConsoleScreenBufferInfo()
|
||||
startCo = consinfo['CursorPosition']
|
||||
|
||||
# Send the string to console input
|
||||
wrote = self.__consin.WriteConsoleInput(records)
|
||||
|
||||
# Wait until all input has been recorded by the console.
|
||||
ts = time.time()
|
||||
while self.__consin.PeekConsoleInput(8) != ():
|
||||
if time.time() > ts + len(s) * .1 + .5:
|
||||
break
|
||||
time.sleep(.05)
|
||||
|
||||
# Hide characters in local echo disabled mode (workaround).
|
||||
if not self.local_echo:
|
||||
self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo)
|
||||
|
||||
return wrote
|
||||
finally:
|
||||
self.switchBack()
|
||||
|
||||
def getCoord(self, offset):
|
||||
"""Converts an offset to a point represented as a tuple."""
|
||||
|
||||
x = offset % self.__consSize[0]
|
||||
y = offset // self.__consSize[0]
|
||||
return win32console.PyCOORDType(x, y)
|
||||
|
||||
def getOffset(self, coord):
|
||||
"""Converts a tuple-point to an offset."""
|
||||
|
||||
return coord.X + coord.Y * self.__consSize[0]
|
||||
|
||||
def readConsole(self, startCo, endCo):
|
||||
"""Reads the console area from startCo to endCo and returns it
|
||||
as a string."""
|
||||
|
||||
buff = []
|
||||
self.lastRead = 0
|
||||
|
||||
while True:
|
||||
startOff = self.getOffset(startCo)
|
||||
endOff = self.getOffset(endCo)
|
||||
readlen = endOff - startOff
|
||||
|
||||
if readlen <= 0:
|
||||
break
|
||||
|
||||
if readlen > 4000:
|
||||
readlen = 4000
|
||||
endPoint = self.getCoord(startOff + readlen)
|
||||
|
||||
s = self.__consout.ReadConsoleOutputCharacter(readlen, startCo)
|
||||
self.lastRead += len(s)
|
||||
self.totalRead += len(s)
|
||||
buff.append(s)
|
||||
|
||||
startCo = endPoint
|
||||
|
||||
return ''.join(buff)
|
||||
|
||||
def parseData(self, s):
|
||||
"""Ensures that special characters are interpretted as
|
||||
newlines or blanks, depending on if there written over
|
||||
characters or screen-buffer-fill characters."""
|
||||
|
||||
strlist = []
|
||||
for i, c in enumerate(s):
|
||||
if c == screenbufferfillchar:
|
||||
if (self.totalRead - self.lastRead + i + 1) % self.__consSize[0] == 0:
|
||||
strlist.append('\r\n')
|
||||
else:
|
||||
strlist.append(c)
|
||||
|
||||
s = ''.join(strlist)
|
||||
return s
|
||||
|
||||
|
||||
def readConsoleToCursor(self):
|
||||
"""Reads from the current read position to the current cursor
|
||||
position and inserts the string into self.__buffer."""
|
||||
|
||||
if not self.__consout:
|
||||
return ""
|
||||
|
||||
consinfo = self.__consout.GetConsoleScreenBufferInfo()
|
||||
cursorPos = consinfo['CursorPosition']
|
||||
|
||||
logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
|
||||
|
||||
isSameX = cursorPos.X == self.__currentReadCo.X
|
||||
isSameY = cursorPos.Y == self.__currentReadCo.Y
|
||||
isSamePos = isSameX and isSameY
|
||||
|
||||
logger.debug('isSameY: %r' % isSameY)
|
||||
logger.debug('isSamePos: %r' % isSamePos)
|
||||
|
||||
if isSameY or not self.lastReadData.endswith('\r\n'):
|
||||
# Read the current slice again
|
||||
self.totalRead -= self.lastRead
|
||||
self.__currentReadCo.X = 0
|
||||
self.__currentReadCo.Y = self.__bufferY
|
||||
|
||||
logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo))
|
||||
|
||||
raw = self.readConsole(self.__currentReadCo, cursorPos)
|
||||
rawlist = []
|
||||
while raw:
|
||||
rawlist.append(raw[:self.__consSize[0]])
|
||||
raw = raw[self.__consSize[0]:]
|
||||
raw = ''.join(rawlist)
|
||||
s = self.parseData(raw)
|
||||
logger.debug(s)
|
||||
for i, line in enumerate(reversed(rawlist)):
|
||||
if line.endswith(screenbufferfillchar):
|
||||
# Record the Y offset where the most recent line break was detected
|
||||
self.__bufferY += len(rawlist) - i
|
||||
break
|
||||
|
||||
logger.debug('lastReadData: %r' % self.lastReadData)
|
||||
logger.debug('s: %r' % s)
|
||||
|
||||
if isSamePos and self.lastReadData == s:
|
||||
logger.debug('isSamePos and self.lastReadData == s')
|
||||
s = ''
|
||||
|
||||
logger.debug('s: %r' % s)
|
||||
|
||||
if s:
|
||||
lastReadData = self.lastReadData
|
||||
pos = self.getOffset(self.__currentReadCo)
|
||||
self.lastReadData = s
|
||||
if isSameY or not lastReadData.endswith('\r\n'):
|
||||
# Detect changed lines
|
||||
self.__buffer.seek(pos)
|
||||
buf = self.__buffer.read()
|
||||
logger.debug('buf: %r' % buf)
|
||||
logger.debug('raw: %r' % raw)
|
||||
if raw.startswith(buf):
|
||||
# Line has grown
|
||||
rawslice = raw[len(buf):]
|
||||
# Update last read bytes so line breaks can be detected in parseData
|
||||
lastRead = self.lastRead
|
||||
self.lastRead = len(rawslice)
|
||||
s = self.parseData(rawslice)
|
||||
self.lastRead = lastRead
|
||||
else:
|
||||
# Cursor has been repositioned
|
||||
s = '\r' + s
|
||||
logger.debug('s: %r' % s)
|
||||
self.__buffer.seek(pos)
|
||||
self.__buffer.truncate()
|
||||
self.__buffer.write(raw)
|
||||
|
||||
self.__currentReadCo.X = cursorPos.X
|
||||
self.__currentReadCo.Y = cursorPos.Y
|
||||
|
||||
return s
|
||||
|
||||
|
||||
def read_nonblocking(self, size):
|
||||
"""Reads data from the console if available, otherwise
|
||||
returns empty string"""
|
||||
|
||||
try:
|
||||
self.switchTo()
|
||||
time.sleep(.01)
|
||||
|
||||
if self.__currentReadCo.Y > maxconsoleY:
|
||||
time.sleep(.2)
|
||||
|
||||
s = self.readConsoleToCursor()
|
||||
|
||||
if self.__currentReadCo.Y > maxconsoleY:
|
||||
self.refreshConsole()
|
||||
|
||||
return s
|
||||
|
||||
finally:
|
||||
self.switchBack()
|
||||
|
||||
raise Exception('Unreachable code...') # pragma: no cover
|
||||
|
||||
|
||||
def refreshConsole(self):
|
||||
"""Clears the console after pausing the child and
|
||||
reading all the data currently on the console."""
|
||||
|
||||
orig = win32console.PyCOORDType(0, 0)
|
||||
self.__consout.SetConsoleCursorPosition(orig)
|
||||
self.__currentReadCo.X = 0
|
||||
self.__currentReadCo.Y = 0
|
||||
writelen = self.__consSize[0] * self.__consSize[1]
|
||||
# Use NUL as fill char because it displays as whitespace
|
||||
# (if we interact() with the child)
|
||||
self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig)
|
||||
|
||||
self.__bufferY = 0
|
||||
self.__buffer.truncate(0)
|
||||
|
||||
consinfo = self.__consout.GetConsoleScreenBufferInfo()
|
||||
cursorPos = consinfo['CursorPosition']
|
||||
logger.debug('refreshConsole: cursorPos %s' % cursorPos)
|
||||
|
||||
|
||||
def setecho(self, state):
|
||||
"""Sets the echo mode of the child console.
|
||||
This is a workaround of the setecho. The original GetConsoleMode() / SetConsoleMode()
|
||||
methods didn't work. See git history for the concrete implementation.
|
||||
2020.01.09 raczben
|
||||
"""
|
||||
|
||||
self.local_echo = state
|
||||
|
||||
def getecho(self):
|
||||
"""Returns the echo mode of the child console.
|
||||
This is a workaround of the getecho. The original GetConsoleMode() / SetConsoleMode()
|
||||
methods didn't work. See git history for the concrete implementation.
|
||||
2020.01.09 raczben
|
||||
"""
|
||||
|
||||
return self.local_echo
|
||||
|
||||
def getwinsize(self):
|
||||
"""Returns the size of the child console as a tuple of
|
||||
(rows, columns)."""
|
||||
|
||||
self.switchTo()
|
||||
try:
|
||||
size = self.__consout.GetConsoleScreenBufferInfo()['Size']
|
||||
finally:
|
||||
self.switchBack()
|
||||
return (size.Y, size.X)
|
||||
|
||||
def setwinsize(self, r, c):
|
||||
"""Sets the child console screen buffer size to (r, c)."""
|
||||
|
||||
self.switchTo()
|
||||
try:
|
||||
self.__consout.SetConsoleScreenBufferSize(win32console.PyCOORDType(c, r))
|
||||
finally:
|
||||
self.switchBack()
|
||||
|
||||
def interact(self):
|
||||
"""Displays the child console for interaction."""
|
||||
|
||||
self.switchTo()
|
||||
try:
|
||||
win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW)
|
||||
finally:
|
||||
self.switchBack()
|
||||
|
||||
def stop_interact(self):
|
||||
"""Hides the child console."""
|
||||
|
||||
self.switchTo()
|
||||
try:
|
||||
win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_HIDE)
|
||||
finally:
|
||||
self.switchBack()
|
||||
|
||||
def isalive(self, console=True):
|
||||
"""True if the child is still alive, false otherwise"""
|
||||
|
||||
if console:
|
||||
return win32process.GetExitCodeProcess(self.__conProcess) == win32con.STILL_ACTIVE
|
||||
else:
|
||||
return win32process.GetExitCodeProcess(self.__childProcess) == win32con.STILL_ACTIVE
|
||||
|
||||
class ConsoleReader: # pragma: no cover
|
||||
|
||||
def __init__(self, path, pid, tid, env = None, cp=None, logdir=None):
|
||||
|
Loading…
Reference in New Issue
Block a user