diff --git a/clearml_session/interactive_session_task.py b/clearml_session/interactive_session_task.py index 9e9d3ae..78fb4ad 100644 --- a/clearml_session/interactive_session_task.py +++ b/clearml_session/interactive_session_task.py @@ -1183,7 +1183,7 @@ def _sync_workspace_snapshot(task, param, auto_shutdown_task): # upload actual snapshot tgz timestamp = datetime.datetime.now(datetime.UTC) \ - if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow() + if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow() # noqa task.upload_artifact( name=artifact_workspace_name, artifact_object=Path(local_gzip), @@ -1338,8 +1338,8 @@ def verify_workspace_storage_access(store_workspace, task): class SyncCallback: - pipe_file_name_c = "/tmp/clearml_sync_pipe_c" - pipe_file_name_r = "/tmp/clearml_sync_pipe_r" + pipe_file_name_c = "clearml_sync_pipe_c" + pipe_file_name_r = "clearml_sync_pipe_r" magic = "DEFAULT" cmd_file = "clearml-sync-workspace" _original_stdout_write = None @@ -1368,19 +1368,28 @@ class SyncCallback: self._monitor_process = monitor_process self._workspace_dir = workspace_dir SyncCallback.singleton = self + tmp_dir = Path(mkdtemp(prefix='session_')) + self.pipe_file_name_c = tmp_dir / SyncCallback.pipe_file_name_c + self.pipe_file_name_r = tmp_dir / SyncCallback.pipe_file_name_r def init(self): - try: - if self._sync_func: + if self._sync_func: + try: self._create_sync_object() self._write_sync_cmd_file() + except Exception as ex: + print("Failed to create sync object: {}".format(ex)) + try: self._create_monitor_process() self._shutdown_cmd() + except Exception as ex: + print("Failed to create shutdown cmd: {}".format(ex)) + try: self._write_ssh_banner() except Exception as ex: - print("Failed to create sync object: {}".format(ex)) + print("Failed to create ssh banner: {}".format(ex)) def background_sync_thread(self) -> None: if not self._sync_func: @@ -1413,18 +1422,22 @@ class SyncCallback: except Exception as ex: print("WARNING: sync callback failed [{}]: {}".format(self._sync_func, ex)) - pipe_out.write("\nEOF={}\n".format(timestamp).encode()) + try: + pipe_out.write("\nEOF={}\n".format(timestamp).encode()) + except Exception as ex: + self._restore_stdout() + print("Exception occurred while syncing: {}".format(ex)) - # so that we push all our prints + # restore original stdout/stderr self._restore_stdout() self._fd = None except Exception as ex: self._restore_stdout() - print("Exception occurred while waiting for sync request: {}".format(ex)) - print("Waiting for 60 seconds...") - sleep(60) + self._fd = None + print("Exception occurred while waiting for sync request: {}\nWaiting for 5 seconds...".format(ex)) + sleep(5) # maybe we will get here os.remove(self.pipe_file_name_r) @@ -1572,6 +1585,7 @@ class SyncCallback: except Exception as ex: print("WARNING: Failed to write to run pid: {}".format(ex)) + shutdown_cmd = None try: shutdown_cmd = shutdown_cmd_directory / "shutdown" with open(shutdown_cmd, "wt") as f: @@ -1594,19 +1608,25 @@ class SyncCallback: self._fd.write(message.encode()) self._fd.flush() except Exception as ex: - self._original_stderr_write("WARNING: failed sending stdout over pipe: {}\n".format(ex)) + self._original_stderr_write(" WARNING: failed sending stdout over pipe: {}\n".format(ex)) return ret def _patch_stdout(self): - self._original_stdout_write = sys.stdout.write - self._original_stderr_write = sys.stderr.write + if not self._original_stdout_write: + self._original_stdout_write = sys.stdout.write + if not self._original_stderr_write: + self._original_stderr_write = sys.stderr.write sys.stdout.write = partial(self._stdout__patched__write__, False,) sys.stderr.write = partial(self._stdout__patched__write__, True,) def _restore_stdout(self): - sys.stdout.write = self._original_stdout_write - sys.stderr.write = self._original_stderr_write + if self._original_stdout_write: + sys.stdout.write = self._original_stdout_write + self._original_stdout_write = None + if self._original_stderr_write: + sys.stderr.write = self._original_stderr_write + self._original_stderr_write = None def _sync_cmd_function():