From fcfa20bade6251f7bb9803108c93b1b134d37c84 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 31 Dec 2020 22:19:32 +0200 Subject: [PATCH] Improve Windows stable connection support layer --- clearml_session/single_thread_proxy.py | 74 +++++++++++++++----------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/clearml_session/single_thread_proxy.py b/clearml_session/single_thread_proxy.py index 1a306de..0f5ea0d 100644 --- a/clearml_session/single_thread_proxy.py +++ b/clearml_session/single_thread_proxy.py @@ -13,10 +13,11 @@ class SingleThreadProxy(object): self.forward = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def start(self, host, port): + # noinspection PyBroadException try: self.forward.connect((host, port)) return self.forward - except Exception as e: + except Exception: return False def __init__(self, port, tgtport, host="127.0.0.1", tgthost="127.0.0.1", @@ -51,39 +52,48 @@ class SingleThreadProxy(object): def main_loop(self): self.input_list.append(self.server) - ss = select.select while 1: time.sleep(self.delay) + # noinspection PyBroadException try: - inputready, outputready, exceptready = ss(self.input_list, [], []) - except: + inputready, outputready, exceptready = select.select(self.input_list, [], []) + except Exception: continue - for self.s in inputready: - if self.s == self.server: + for s in inputready: + if s == self.server: + # noinspection PyBroadException try: self.on_accept() - except: + except Exception: pass break + # noinspection PyBroadException try: - self.data = self.s.recv(self.buffer_size) - except: + data = s.recv(self.buffer_size) + except ConnectionResetError: + # this will trigger on_close + data = [] + except Exception: continue - if len(self.data) == 0: + + if len(data) == 0: + # noinspection PyBroadException try: - self.on_close() - except: + self.on_close(s) + except Exception: pass break else: + # noinspection PyBroadException try: - self.on_recv() - except: + self.on_recv(s, data) + except Exception: pass def on_accept(self): clientsock, clientaddr = self.server.accept() + forward = None for i in range(self.max_timeout_for_remote_connection): forward = self.Forward().start(self.tgthost, self.tgtport) if forward: @@ -92,35 +102,35 @@ class SingleThreadProxy(object): time.sleep(1) if forward: - # logger.info("{0} has connected".format(clientaddr)) + # print("{0} has connected".format(clientaddr)) self.input_list.append(clientsock) self.input_list.append(forward) self.channel[clientsock] = forward self.channel[forward] = clientsock - _sidbase = "{0}_{1}_{2}_{3}".format(self.tgthost, self.tgtport, clientaddr[0], clientaddr[1]) - self.sidmap[clientsock] = (_sidbase, 1) - self.sidmap[forward] = (_sidbase, -1) + sidbase = "{0}_{1}_{2}_{3}".format(self.tgthost, self.tgtport, clientaddr[0], clientaddr[1]) + self.sidmap[clientsock] = (sidbase, 1) + self.sidmap[forward] = (sidbase, -1) else: - # logger.warn("Can't establish connection with remote server.\n" - # "Closing connection with client side{0}".format(clientaddr)) + # print("Can't establish connection with remote server.\n" + # "Closing connection with client side{0}".format(clientaddr)) clientsock.close() - def on_close(self): + def on_close(self, s): # logger.info("{0} has disconnected".format(self.s.getpeername())) + # print("has disconnected") - self.input_list.remove(self.s) - self.input_list.remove(self.channel[self.s]) - out = self.channel[self.s] + self.input_list.remove(s) + self.input_list.remove(self.channel[s]) + out = self.channel[s] self.channel[out].close() - self.channel[self.s].close() + self.channel[s].close() del self.channel[out] - del self.channel[self.s] + del self.channel[s] del self.sidmap[out] - del self.sidmap[self.s] + del self.sidmap[s] - def on_recv(self): - _sidbase = self.sidmap[self.s][0] - _c_or_s = self.sidmap[self.s][1] - data = self.data + def on_recv(self, s, data): + _sidbase = self.sidmap[s][0] + _c_or_s = self.sidmap[s][1] # logger.debug(ctrl_less(data.strip())) - self.channel[self.s].send(data) + self.channel[s].send(data)