Fix clearml-sync-workspace broken pipe

This commit is contained in:
clearml 2025-04-20 15:56:58 +03:00
parent 89beff247a
commit ca5f61ff09

View File

@ -1183,7 +1183,7 @@ def _sync_workspace_snapshot(task, param, auto_shutdown_task):
# upload actual snapshot tgz
timestamp = datetime.datetime.now(datetime.UTC) \
if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow()
if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow() # noqa
task.upload_artifact(
name=artifact_workspace_name,
artifact_object=Path(local_gzip),
@ -1338,8 +1338,8 @@ def verify_workspace_storage_access(store_workspace, task):
class SyncCallback:
pipe_file_name_c = "/tmp/clearml_sync_pipe_c"
pipe_file_name_r = "/tmp/clearml_sync_pipe_r"
pipe_file_name_c = "clearml_sync_pipe_c"
pipe_file_name_r = "clearml_sync_pipe_r"
magic = "DEFAULT"
cmd_file = "clearml-sync-workspace"
_original_stdout_write = None
@ -1368,19 +1368,28 @@ class SyncCallback:
self._monitor_process = monitor_process
self._workspace_dir = workspace_dir
SyncCallback.singleton = self
tmp_dir = Path(mkdtemp(prefix='session_'))
self.pipe_file_name_c = tmp_dir / SyncCallback.pipe_file_name_c
self.pipe_file_name_r = tmp_dir / SyncCallback.pipe_file_name_r
def init(self):
try:
if self._sync_func:
if self._sync_func:
try:
self._create_sync_object()
self._write_sync_cmd_file()
except Exception as ex:
print("Failed to create sync object: {}".format(ex))
try:
self._create_monitor_process()
self._shutdown_cmd()
except Exception as ex:
print("Failed to create shutdown cmd: {}".format(ex))
try:
self._write_ssh_banner()
except Exception as ex:
print("Failed to create sync object: {}".format(ex))
print("Failed to create ssh banner: {}".format(ex))
def background_sync_thread(self) -> None:
if not self._sync_func:
@ -1413,18 +1422,22 @@ class SyncCallback:
except Exception as ex:
print("WARNING: sync callback failed [{}]: {}".format(self._sync_func, ex))
pipe_out.write("\nEOF={}\n".format(timestamp).encode())
try:
pipe_out.write("\nEOF={}\n".format(timestamp).encode())
except Exception as ex:
self._restore_stdout()
print("Exception occurred while syncing: {}".format(ex))
# so that we push all our prints
# restore original stdout/stderr
self._restore_stdout()
self._fd = None
except Exception as ex:
self._restore_stdout()
print("Exception occurred while waiting for sync request: {}".format(ex))
print("Waiting for 60 seconds...")
sleep(60)
self._fd = None
print("Exception occurred while waiting for sync request: {}\nWaiting for 5 seconds...".format(ex))
sleep(5)
# maybe we will get here
os.remove(self.pipe_file_name_r)
@ -1572,6 +1585,7 @@ class SyncCallback:
except Exception as ex:
print("WARNING: Failed to write to run pid: {}".format(ex))
shutdown_cmd = None
try:
shutdown_cmd = shutdown_cmd_directory / "shutdown"
with open(shutdown_cmd, "wt") as f:
@ -1594,19 +1608,25 @@ class SyncCallback:
self._fd.write(message.encode())
self._fd.flush()
except Exception as ex:
self._original_stderr_write("WARNING: failed sending stdout over pipe: {}\n".format(ex))
self._original_stderr_write(" WARNING: failed sending stdout over pipe: {}\n".format(ex))
return ret
def _patch_stdout(self):
self._original_stdout_write = sys.stdout.write
self._original_stderr_write = sys.stderr.write
if not self._original_stdout_write:
self._original_stdout_write = sys.stdout.write
if not self._original_stderr_write:
self._original_stderr_write = sys.stderr.write
sys.stdout.write = partial(self._stdout__patched__write__, False,)
sys.stderr.write = partial(self._stdout__patched__write__, True,)
def _restore_stdout(self):
sys.stdout.write = self._original_stdout_write
sys.stderr.write = self._original_stderr_write
if self._original_stdout_write:
sys.stdout.write = self._original_stdout_write
self._original_stdout_write = None
if self._original_stderr_write:
sys.stderr.write = self._original_stderr_write
self._original_stderr_write = None
def _sync_cmd_function():