diff --git a/clearml_session/interactive_session_task.py b/clearml_session/interactive_session_task.py index bbd2e5a..cf099da 100644 --- a/clearml_session/interactive_session_task.py +++ b/clearml_session/interactive_session_task.py @@ -1406,6 +1406,8 @@ class SyncCallback: if not os.path.exists(self.pipe_file_name_r): os.mkfifo(self.pipe_file_name_r, 0o644) + timestamp = command.split("=", 1)[-1] + with open(self.pipe_file_name_r, 'wb') as pipe_out: self._fd = pipe_out # so that we push all our prints @@ -1416,7 +1418,7 @@ class SyncCallback: except Exception as ex: print("WARNING: sync callback failed [{}]: {}".format(self._sync_func, ex)) - pipe_out.write("\nEOF\n".encode()) + pipe_out.write("\nEOF={}\n".format(timestamp).encode()) # so that we push all our prints self._restore_stdout() @@ -1616,6 +1618,8 @@ def _sync_cmd_function(): # this is where we put all the imports and the sync call back import os import sys + from time import time + print("Storing workspace to persistent storage") try: if not os.path.exists(SyncCallback.pipe_file_name_c): @@ -1624,9 +1628,10 @@ def _sync_cmd_function(): print("ERROR: Failed creating request pipe {}".format(ex)) # push the request + timestamp = str(time()) try: with open(SyncCallback.pipe_file_name_c, 'wt') as pipe: - cmd = "{}:sync".format(SyncCallback.magic) + cmd = "{}:sync={}".format(SyncCallback.magic, timestamp) pipe.write(cmd) except Exception as ex: print("ERROR: Failed sending sync request {}".format(ex)) @@ -1636,17 +1641,28 @@ def _sync_cmd_function(): # Read the result from the server with open(SyncCallback.pipe_file_name_r, 'rb') as pipe: while True: - result = pipe.readline().decode() + result = pipe.readline() + if not result: + break + result = result.decode() if result.endswith("NEOL\n"): result = result[:-5] # read from fd - if "EOF" in result.split("\n"): - sys.stdout.write(result.replace("EOF\n", "\n")) - print("Workspace synced successfully") - break - sys.stdout.write(result) + if "EOF=" in result: + stop = False + for l in result.split("\n"): + if not l.startswith("EOF="): + sys.stdout.write(l+"\n") + elif l.endswith("={}".format(timestamp)): + stop = True + + if stop: + print("Workspace synced successfully") + break + else: + sys.stdout.write(result) except Exception as ex: print("ERROR: Failed reading sync request result {}".format(ex))