dropbear/test/test_dropbear.py

115 lines
2.7 KiB
Python
Raw Permalink Normal View History

import subprocess
import os
import pty
import tempfile
import logging
import time
import socketserver
import threading
import queue
import pytest
LOCALADDR="127.0.5.5"
@pytest.fixture(scope="module")
def dropbear(request):
opt = request.config.option
if opt.remote:
yield None
return
args = [opt.dropbear,
"-p", LOCALADDR, # bind locally only
"-r", opt.hostkey,
"-p", opt.port,
"-F", "-E",
]
p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
# Wait until it has started listening
for l in p.stderr:
if "Not backgrounding" in l:
break
# Check it's still running
assert p.poll() is None
# Ready
yield p
p.terminate()
2021-10-19 05:30:58 +00:00
print("Terminated dropbear. Flushing output:")
for l in p.stderr:
print(l.rstrip())
print("Done")
def dbclient(request, *args, **kwargs):
opt = request.config.option
host = opt.remote or LOCALADDR
base_args = [opt.dbclient, "-y", host, "-p", opt.port]
if opt.user:
full_args.extend(['-l', opt.user])
full_args = base_args + list(args)
bg = kwargs.get("background")
if "background" in kwargs:
del kwargs["background"]
if bg:
return subprocess.Popen(full_args, **kwargs)
else:
kwargs.setdefault("timeout", 10)
# wait for response
return subprocess.run(full_args, **kwargs)
class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
""" Listens for a single incoming request, sends a response if given,
and returns the inbound data.
Reponse can be a queue object, in which case each item in the queue will
be sent as a response, until it receives a None item.
"""
def __init__(self, port, timeout, response=None):
super().__init__(('localhost', port), self.Handler)
self.port = port
self.timeout = timeout
self.response = response
self.sink = None
class Handler(socketserver.StreamRequestHandler):
def handle(self):
if isinstance(self.server.response, queue.Queue):
while True:
i = self.server.response.get()
if i is None:
break
self.wfile.write(i)
elif self.server.response:
self.wfile.write(self.server.response)
assert self.server.sink is None, ">1 request sent to handler"
self.server.sink = self.rfile.read()
def __enter__(self):
self.server_thread = threading.Thread(target=self.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
return self
def __exit__(self, *exc_stuff):
self.shutdown()
self.server_thread.join()
def inbound(self):
""" Returns the data sent to the socket """
return self.sink
def readall_socket(sock):
b = []
while True:
i = sock.recv(4096)
if not i:
break
b.append(i)
return b''.join(b)
# returns a str
def random_alnum(size):
r = os.urandom(500 + size*5)
return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()