From 40f6d8f8e774d08264c75d0e9b0453554f858b7c Mon Sep 17 00:00:00 2001 From: Benedek Racz Date: Tue, 4 Feb 2020 10:36:08 +0100 Subject: [PATCH] minimal 17 --- wexpect/legacy_wexpect.py | 370 -------------------------------------- 1 file changed, 370 deletions(-) diff --git a/wexpect/legacy_wexpect.py b/wexpect/legacy_wexpect.py index cb7ea27..db88412 100644 --- a/wexpect/legacy_wexpect.py +++ b/wexpect/legacy_wexpect.py @@ -668,381 +668,11 @@ class Wtty: logger.info(f'self.conpid: {self.conpid}') logger.info(f'self.__otid: {self.__otid}') - def switchTo(self, attatched=True): - """Releases from the current console and attatches - to the childs.""" - - if not self.__switch: - return - - try: - # No 'attached' check is needed, FreeConsole() can be called multiple times. - win32console.FreeConsole() - # This is the workaround for #14. The #14 will still occure if the child process - # finishes between this `isalive()` check and `AttachConsole(self.conpid)`. (However the - # risk is low.) - if not self.isalive(console=True): - # When child has finished... - logger.info('EOF: End Of File (EOF) in switchTo().') - raise EOF('End Of File (EOF) in switchTo().') - - win32console.AttachConsole(self.conpid) - self.__consin = win32console.GetStdHandle(win32console.STD_INPUT_HANDLE) - self.__consout = self.getConsoleOut() - - except pywintypes.error as e: - # pywintypes.error: (5, 'AttachConsole', 'Access is denied.') - # When child has finished... - logging.info(e) - # In case of any error: We "switch back" (attach) our original console, then raise the - # error. - self.switchBack() - logger.info('EOF: End Of File (EOF) in switchTo().') - raise EOF('End Of File (EOF) in switchTo().') - except: - # In case of any error: We "switch back" (attach) our original console, then raise the - # error. - self.switchBack() - logger.info(traceback.format_exc()) - raise - - - def switchBack(self): - """Releases from the current console and attaches - to the parents.""" - - if not self.__switch: - return - - if self.console: - # If we originally had a console, re-attach it (or allocate a new one) - # If we didn't have a console to begin with, there's no need to - # re-attach/allocate - win32console.FreeConsole() - if len(self.processList) > 1: - # Our original console is still present, re-attach - win32console.AttachConsole(self.__parentPid) - else: - # Our original console has been free'd, allocate a new one - win32console.AllocConsole() - - self.__consin = None - self.__consout = None - - def getConsoleOut(self): - consout = win32file.CreateFile('CONOUT$', - win32con.GENERIC_READ | win32con.GENERIC_WRITE, - win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE, - None, - win32con.OPEN_EXISTING, - 0, - 0) - - return win32console.PyConsoleScreenBufferType(consout) - - def getchild(self): - """Returns a handle to the child process.""" - - return self.__childProcess - def terminate_child(self): """Terminate the child process.""" win32api.TerminateProcess(self.__childProcess, 1) # win32api.win32process.TerminateProcess(self.__childProcess, 1) - def createKeyEvent(self, char): - """Creates a single key record corrosponding to - the ascii character char.""" - - evt = win32console.PyINPUT_RECORDType(win32console.KEY_EVENT) - evt.KeyDown = True - evt.Char = char - evt.RepeatCount = 1 - return evt - - def write(self, s): - """Writes input into the child consoles input buffer.""" - - if len(s) == 0: - return 0 - self.switchTo() - try: - if s[-1] == '\n': - s = s[:-1] - records = [self.createKeyEvent(c) for c in str(s)] - if not self.__consout: - return "" - - # Store the current cursor position to hide characters in local echo disabled mode (workaround). - consinfo = self.__consout.GetConsoleScreenBufferInfo() - startCo = consinfo['CursorPosition'] - - # Send the string to console input - wrote = self.__consin.WriteConsoleInput(records) - - # Wait until all input has been recorded by the console. - ts = time.time() - while self.__consin.PeekConsoleInput(8) != (): - if time.time() > ts + len(s) * .1 + .5: - break - time.sleep(.05) - - # Hide characters in local echo disabled mode (workaround). - if not self.local_echo: - self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, len(s), startCo) - - return wrote - finally: - self.switchBack() - - def getCoord(self, offset): - """Converts an offset to a point represented as a tuple.""" - - x = offset % self.__consSize[0] - y = offset // self.__consSize[0] - return win32console.PyCOORDType(x, y) - - def getOffset(self, coord): - """Converts a tuple-point to an offset.""" - - return coord.X + coord.Y * self.__consSize[0] - - def readConsole(self, startCo, endCo): - """Reads the console area from startCo to endCo and returns it - as a string.""" - - buff = [] - self.lastRead = 0 - - while True: - startOff = self.getOffset(startCo) - endOff = self.getOffset(endCo) - readlen = endOff - startOff - - if readlen <= 0: - break - - if readlen > 4000: - readlen = 4000 - endPoint = self.getCoord(startOff + readlen) - - s = self.__consout.ReadConsoleOutputCharacter(readlen, startCo) - self.lastRead += len(s) - self.totalRead += len(s) - buff.append(s) - - startCo = endPoint - - return ''.join(buff) - - def parseData(self, s): - """Ensures that special characters are interpretted as - newlines or blanks, depending on if there written over - characters or screen-buffer-fill characters.""" - - strlist = [] - for i, c in enumerate(s): - if c == screenbufferfillchar: - if (self.totalRead - self.lastRead + i + 1) % self.__consSize[0] == 0: - strlist.append('\r\n') - else: - strlist.append(c) - - s = ''.join(strlist) - return s - - - def readConsoleToCursor(self): - """Reads from the current read position to the current cursor - position and inserts the string into self.__buffer.""" - - if not self.__consout: - return "" - - consinfo = self.__consout.GetConsoleScreenBufferInfo() - cursorPos = consinfo['CursorPosition'] - - logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) - - isSameX = cursorPos.X == self.__currentReadCo.X - isSameY = cursorPos.Y == self.__currentReadCo.Y - isSamePos = isSameX and isSameY - - logger.debug('isSameY: %r' % isSameY) - logger.debug('isSamePos: %r' % isSamePos) - - if isSameY or not self.lastReadData.endswith('\r\n'): - # Read the current slice again - self.totalRead -= self.lastRead - self.__currentReadCo.X = 0 - self.__currentReadCo.Y = self.__bufferY - - logger.debug('cursor: %r, current: %r' % (cursorPos, self.__currentReadCo)) - - raw = self.readConsole(self.__currentReadCo, cursorPos) - rawlist = [] - while raw: - rawlist.append(raw[:self.__consSize[0]]) - raw = raw[self.__consSize[0]:] - raw = ''.join(rawlist) - s = self.parseData(raw) - logger.debug(s) - for i, line in enumerate(reversed(rawlist)): - if line.endswith(screenbufferfillchar): - # Record the Y offset where the most recent line break was detected - self.__bufferY += len(rawlist) - i - break - - logger.debug('lastReadData: %r' % self.lastReadData) - logger.debug('s: %r' % s) - - if isSamePos and self.lastReadData == s: - logger.debug('isSamePos and self.lastReadData == s') - s = '' - - logger.debug('s: %r' % s) - - if s: - lastReadData = self.lastReadData - pos = self.getOffset(self.__currentReadCo) - self.lastReadData = s - if isSameY or not lastReadData.endswith('\r\n'): - # Detect changed lines - self.__buffer.seek(pos) - buf = self.__buffer.read() - logger.debug('buf: %r' % buf) - logger.debug('raw: %r' % raw) - if raw.startswith(buf): - # Line has grown - rawslice = raw[len(buf):] - # Update last read bytes so line breaks can be detected in parseData - lastRead = self.lastRead - self.lastRead = len(rawslice) - s = self.parseData(rawslice) - self.lastRead = lastRead - else: - # Cursor has been repositioned - s = '\r' + s - logger.debug('s: %r' % s) - self.__buffer.seek(pos) - self.__buffer.truncate() - self.__buffer.write(raw) - - self.__currentReadCo.X = cursorPos.X - self.__currentReadCo.Y = cursorPos.Y - - return s - - - def read_nonblocking(self, size): - """Reads data from the console if available, otherwise - returns empty string""" - - try: - self.switchTo() - time.sleep(.01) - - if self.__currentReadCo.Y > maxconsoleY: - time.sleep(.2) - - s = self.readConsoleToCursor() - - if self.__currentReadCo.Y > maxconsoleY: - self.refreshConsole() - - return s - - finally: - self.switchBack() - - raise Exception('Unreachable code...') # pragma: no cover - - - def refreshConsole(self): - """Clears the console after pausing the child and - reading all the data currently on the console.""" - - orig = win32console.PyCOORDType(0, 0) - self.__consout.SetConsoleCursorPosition(orig) - self.__currentReadCo.X = 0 - self.__currentReadCo.Y = 0 - writelen = self.__consSize[0] * self.__consSize[1] - # Use NUL as fill char because it displays as whitespace - # (if we interact() with the child) - self.__consout.FillConsoleOutputCharacter(screenbufferfillchar, writelen, orig) - - self.__bufferY = 0 - self.__buffer.truncate(0) - - consinfo = self.__consout.GetConsoleScreenBufferInfo() - cursorPos = consinfo['CursorPosition'] - logger.debug('refreshConsole: cursorPos %s' % cursorPos) - - - def setecho(self, state): - """Sets the echo mode of the child console. - This is a workaround of the setecho. The original GetConsoleMode() / SetConsoleMode() - methods didn't work. See git history for the concrete implementation. - 2020.01.09 raczben - """ - - self.local_echo = state - - def getecho(self): - """Returns the echo mode of the child console. - This is a workaround of the getecho. The original GetConsoleMode() / SetConsoleMode() - methods didn't work. See git history for the concrete implementation. - 2020.01.09 raczben - """ - - return self.local_echo - - def getwinsize(self): - """Returns the size of the child console as a tuple of - (rows, columns).""" - - self.switchTo() - try: - size = self.__consout.GetConsoleScreenBufferInfo()['Size'] - finally: - self.switchBack() - return (size.Y, size.X) - - def setwinsize(self, r, c): - """Sets the child console screen buffer size to (r, c).""" - - self.switchTo() - try: - self.__consout.SetConsoleScreenBufferSize(win32console.PyCOORDType(c, r)) - finally: - self.switchBack() - - def interact(self): - """Displays the child console for interaction.""" - - self.switchTo() - try: - win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_SHOW) - finally: - self.switchBack() - - def stop_interact(self): - """Hides the child console.""" - - self.switchTo() - try: - win32gui.ShowWindow(win32console.GetConsoleWindow(), win32con.SW_HIDE) - finally: - self.switchBack() - - def isalive(self, console=True): - """True if the child is still alive, false otherwise""" - - if console: - return win32process.GetExitCodeProcess(self.__conProcess) == win32con.STILL_ACTIVE - else: - return win32process.GetExitCodeProcess(self.__childProcess) == win32con.STILL_ACTIVE - class ConsoleReader: # pragma: no cover def __init__(self, path, pid, tid, env = None, cp=None, logdir=None):