mirror of
https://github.com/clearml/dropbear
synced 2025-02-07 05:17:28 +00:00
65f6e48a06
These initial tests are checking various edge cases of channel handling that have cropped up over the years.
110 lines
2.6 KiB
Python
110 lines
2.6 KiB
Python
import subprocess
|
|
import os
|
|
import pty
|
|
import tempfile
|
|
import logging
|
|
import time
|
|
import socketserver
|
|
import threading
|
|
import queue
|
|
|
|
import pytest
|
|
|
|
LOCALADDR="127.0.5.5"
|
|
|
|
@pytest.fixture(scope="module")
|
|
def dropbear(request):
|
|
opt = request.config.option
|
|
if opt.remote:
|
|
yield None
|
|
return
|
|
|
|
args = [opt.dropbear,
|
|
"-p", LOCALADDR, # bind locally only
|
|
"-r", opt.hostkey,
|
|
"-p", opt.port,
|
|
"-F", "-E",
|
|
]
|
|
p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
|
|
# Wait until it has started listening
|
|
for l in p.stderr:
|
|
if "Not backgrounding" in l:
|
|
break
|
|
# Check it's still running
|
|
assert p.poll() is None
|
|
# Ready
|
|
yield p
|
|
p.terminate()
|
|
|
|
def dbclient(request, *args, **kwargs):
|
|
opt = request.config.option
|
|
host = opt.remote or LOCALADDR
|
|
base_args = [opt.dbclient, "-y", host, "-p", opt.port]
|
|
if opt.user:
|
|
full_args.extend(['-l', opt.user])
|
|
full_args = base_args + list(args)
|
|
bg = kwargs.get("background")
|
|
if "background" in kwargs:
|
|
del kwargs["background"]
|
|
if bg:
|
|
return subprocess.Popen(full_args, **kwargs)
|
|
else:
|
|
# wait for response
|
|
return subprocess.run(full_args, **kwargs)
|
|
|
|
class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
""" Listens for a single incoming request, sends a response if given,
|
|
and returns the inbound data.
|
|
Reponse can be a queue object, in which case each item in the queue will
|
|
be sent as a response, until it receives a None item.
|
|
"""
|
|
def __init__(self, port, timeout, response=None):
|
|
super().__init__(('localhost', port), self.Handler)
|
|
self.port = port
|
|
self.timeout = timeout
|
|
self.response = response
|
|
self.sink = None
|
|
|
|
class Handler(socketserver.StreamRequestHandler):
|
|
def handle(self):
|
|
if isinstance(self.server.response, queue.Queue):
|
|
while True:
|
|
i = self.server.response.get()
|
|
if i is None:
|
|
break
|
|
self.wfile.write(i)
|
|
elif self.server.response:
|
|
self.wfile.write(self.server.response)
|
|
assert self.server.sink is None, ">1 request sent to handler"
|
|
self.server.sink = self.rfile.read()
|
|
|
|
def __enter__(self):
|
|
self.server_thread = threading.Thread(target=self.serve_forever)
|
|
self.server_thread.daemon = True
|
|
self.server_thread.start()
|
|
return self
|
|
|
|
def __exit__(self, *exc_stuff):
|
|
self.shutdown()
|
|
self.server_thread.join()
|
|
|
|
def inbound(self):
|
|
""" Returns the data sent to the socket """
|
|
return self.sink
|
|
|
|
def readall_socket(sock):
|
|
b = []
|
|
while True:
|
|
i = sock.recv(4096)
|
|
if not i:
|
|
break
|
|
b.append(i)
|
|
return b''.join(b)
|
|
|
|
# returns a str
|
|
def random_alnum(size):
|
|
r = os.urandom(500 + size*5)
|
|
return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
|
|
|
|
|