mirror of
https://github.com/clearml/clearml-session
synced 2025-05-05 12:34:34 +00:00
Upgrad vscode_version = '4.99.2' & python_ext_version = '2025.4.0'
Add `clearml-sync-workspace` & `shutdown` commands Add --disable-storage-packages (disable auto boto3/gs/azure package install) Add support for Fedora/Centos/Rocky containers Add Support vscode for ARM machines Add CLEARML_SESSION_VSCODE_SERVER_DEB can now have a prefix only e.g. "https://github.com/coder/code-server/releases/download/v{version}/code-server_{version}"
This commit is contained in:
parent
c59ca8e530
commit
03697e0cbf
@ -189,16 +189,21 @@ def create_base_task(state, project_name=None, task_name=None, continue_task_id=
|
||||
task_script = task.data.script.to_dict()
|
||||
base_script_file = os.path.abspath(os.path.join(__file__, '..', 'tcp_proxy.py'))
|
||||
with open(base_script_file, 'rt') as f:
|
||||
task_script['diff'] = f.read()
|
||||
# notice lines always end with \n
|
||||
task_script['diff'] = "".join([line for line in f.readlines() if not line.lstrip().startswith("#")])
|
||||
base_script_file = os.path.abspath(os.path.join(__file__, '..', 'interactive_session_task.py'))
|
||||
with open(base_script_file, 'rt') as f:
|
||||
task_script['diff'] += '\n\n' + f.read()
|
||||
task_script['diff'] += "\n\n"
|
||||
# notice lines always end with \n
|
||||
task_script['diff'] += "".join([line for line in f.readlines() if not line.lstrip().startswith("#")])
|
||||
|
||||
task_script['working_dir'] = '.'
|
||||
task_script['entry_point'] = '.interactive_session.py'
|
||||
task_script['requirements'] = {'pip': '\n'.join(
|
||||
["clearml>=1.1.5"] +
|
||||
(["jupyter", "jupyterlab", "jupyterlab_git", "traitlets"] if state.get('jupyter_lab') else []) +
|
||||
(["boto3>=1.9", "azure-storage-blob>=12.0.0", "google-cloud-storage>=1.13.2"]
|
||||
if not state.get('disable_storage_packages') else []) + \
|
||||
(['pylint'] if state.get('vscode_server') else []))}
|
||||
|
||||
section, _, _ = _get_config_section_name()
|
||||
@ -267,7 +272,7 @@ def create_debugging_task(state, debug_task_id, task_name=None, task_project_id=
|
||||
|
||||
base_script_file = os.path.abspath(os.path.join(__file__, '..', 'interactive_session_task.py'))
|
||||
with open(base_script_file, 'rt') as f:
|
||||
entry_diff = ['+'+line.rstrip() for line in f.readlines()]
|
||||
entry_diff = ['+'+line.rstrip() for line in f.readlines() if not line.lstrip().startswith("#")]
|
||||
entry_diff_header = \
|
||||
"diff --git a/__interactive_session__.py b/__interactive_session__.py\n" \
|
||||
"--- a/__interactive_session__.py\n" \
|
||||
@ -281,6 +286,8 @@ def create_debugging_task(state, debug_task_id, task_name=None, task_project_id=
|
||||
state['packages'] = \
|
||||
(state.get('packages') or []) + ["clearml"] + \
|
||||
(["jupyter", "jupyterlab", "jupyterlab_git", "traitlets"] if state.get('jupyter_lab') else []) + \
|
||||
(["boto3>=1.9", "azure-storage-blob>=12.0.0", "google-cloud-storage>=1.13.2"]
|
||||
if not state.get('disable_storage_packages') else []) + \
|
||||
(['pylint'] if state.get('vscode_server') else [])
|
||||
task.update_task(task_state)
|
||||
section, _, _ = _get_config_section_name()
|
||||
@ -420,9 +427,10 @@ def _get_running_tasks(client, prev_task_id):
|
||||
'page_size': 10, 'page': 0,
|
||||
'order_by': ['-last_update'],
|
||||
'user': [current_user_id],
|
||||
'only_fields': ['id', 'created', 'parent']
|
||||
'only_fields': ['id', 'created', 'parent', 'status_message']
|
||||
})
|
||||
tasks_id_created = [(t.id, t.created, t.parent) for t in previous_tasks]
|
||||
tasks_id_created = [(t.id, t.created, t.parent) for t in previous_tasks
|
||||
if "stopping" not in (t.status_message or "")]
|
||||
if prev_task_id and prev_task_id not in (t[0] for t in tasks_id_created):
|
||||
# manually check the last task.id
|
||||
try:
|
||||
@ -431,13 +439,13 @@ def _get_running_tasks(client, prev_task_id):
|
||||
'id': [prev_task_id],
|
||||
'page_size': 10, 'page': 0,
|
||||
'order_by': ['-last_update'],
|
||||
'only_fields': ['id', 'created', 'parent']
|
||||
'only_fields': ['id', 'created', 'parent', 'status_message']
|
||||
})
|
||||
except APIError:
|
||||
# we could not find previous task, nothing to worry about.
|
||||
prev_tasks = None
|
||||
|
||||
if prev_tasks:
|
||||
if prev_tasks and "stopping" not in (prev_tasks[0].status_message or ""):
|
||||
tasks_id_created += [(prev_tasks[0].id, prev_tasks[0].created, prev_tasks[0].parent)]
|
||||
|
||||
return tasks_id_created
|
||||
@ -1462,6 +1470,10 @@ def setup_parser(parser):
|
||||
parser.add_argument('--force-dropbear', default=None, nargs='?', const='true', metavar='true/false',
|
||||
type=lambda x: (str(x).strip().lower() in ('true', 'yes')),
|
||||
help='Force using `dropbear` instead of SSHd')
|
||||
parser.add_argument('--disable-storage-packages', default=None, nargs='?', const='true', metavar='true/false',
|
||||
type=lambda x: (str(x).strip().lower() in ('true', 'yes')),
|
||||
help='If True automatic boto3/azure-storage-blob/google-cloud-storage python '
|
||||
'packages will not be added, you can manually add them using --packages')
|
||||
parser.add_argument('--disable-store-defaults', action='store_true', default=None,
|
||||
help='If set, do not store current setup as new default configuration')
|
||||
parser.add_argument('--disable-fingerprint-check', action='store_true', default=None,
|
||||
|
@ -9,8 +9,10 @@ from copy import deepcopy
|
||||
import getpass
|
||||
from functools import partial
|
||||
from tempfile import mkstemp, gettempdir, mkdtemp
|
||||
from threading import Thread
|
||||
from time import sleep, time
|
||||
from datetime import datetime
|
||||
import datetime
|
||||
from uuid import uuid4
|
||||
|
||||
import psutil
|
||||
import requests
|
||||
@ -261,18 +263,23 @@ def monitor_jupyter_server(fd, local_filename, process, task, jupyter_port, host
|
||||
task.set_parameter(name='properties/jupyter_url', value=jupyter_url)
|
||||
|
||||
print('\nJupyter Lab URL: {}\n'.format(jupyter_url))
|
||||
# if we got here, we have a token and we can leave
|
||||
break
|
||||
|
||||
# skip cleanup the process is still running
|
||||
# cleanup
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.close(fd)
|
||||
except Exception:
|
||||
pass
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.unlink(local_filename)
|
||||
except Exception:
|
||||
pass
|
||||
# # noinspection PyBroadException
|
||||
# try:
|
||||
# os.close(fd)
|
||||
# except Exception:
|
||||
# pass
|
||||
# # noinspection PyBroadException
|
||||
# try:
|
||||
# os.unlink(local_filename)
|
||||
# except Exception:
|
||||
# pass
|
||||
|
||||
return process
|
||||
|
||||
|
||||
def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.1", port=None):
|
||||
@ -281,8 +288,8 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
|
||||
# get vscode version and python extension version
|
||||
# they are extremely flaky, this combination works, most do not.
|
||||
vscode_version = '4.96.2'
|
||||
python_ext_version = '2024.22.1'
|
||||
vscode_version = '4.99.2'
|
||||
python_ext_version = '2025.4.0'
|
||||
if param.get("vscode_version"):
|
||||
vscode_version_parts = param.get("vscode_version").split(':')
|
||||
vscode_version = vscode_version_parts[0]
|
||||
@ -301,9 +308,27 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
# example of CLEARML_SESSION_VSCODE_SERVER_DEB value
|
||||
# 'https://github.com/coder/code-server/releases/download/v4.96.2/code-server_4.96.2_amd64.deb'
|
||||
# (see https://github.com/coder/code-server/releases)
|
||||
|
||||
code_server_deb_download_link = \
|
||||
os.environ.get("CLEARML_SESSION_VSCODE_SERVER_DEB") or \
|
||||
'https://github.com/coder/code-server/releases/download/v{version}/code-server_{version}_amd64.deb'
|
||||
'https://github.com/coder/code-server/releases/download/v{version}/code-server_{version}'
|
||||
|
||||
# support x86/arm dnf/deb
|
||||
if not code_server_deb_download_link.endswith(".deb") and not code_server_deb_download_link.endswith(".rpm"):
|
||||
import platform
|
||||
if "arm" in (platform.processor() or "").lower():
|
||||
code_server_deb_download_link += "_arm64"
|
||||
else:
|
||||
code_server_deb_download_link += "_amd64"
|
||||
|
||||
if shutil.which("dnf") and not shutil.which("apt"):
|
||||
code_server_deb_download_link += ".rpm"
|
||||
parts = code_server_deb_download_link.split("/")
|
||||
# rpm links with "-" instead of "_" in the rpm filename
|
||||
parts[-1] = parts[-1].replace("_", "-")
|
||||
code_server_deb_download_link = "/".join(parts)
|
||||
else:
|
||||
code_server_deb_download_link += ".deb"
|
||||
|
||||
pre_installed = False
|
||||
python_ext = None
|
||||
@ -315,7 +340,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
# check if preinstalled
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
vscode_path = subprocess.check_output('which code-server', shell=True).decode().strip()
|
||||
vscode_path = shutil.which("code-server")
|
||||
pre_installed = bool(vscode_path)
|
||||
except Exception:
|
||||
vscode_path = None
|
||||
@ -329,7 +354,10 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
code_server_deb = StorageManager.get_local_copy(
|
||||
code_server_deb_download_link.format(version=vscode_version),
|
||||
extract_archive=False)
|
||||
os.system("dpkg -i {}".format(code_server_deb))
|
||||
if shutil.which("dnf") and not shutil.which("dpkg"):
|
||||
os.system("dnf install -y {}".format(code_server_deb))
|
||||
else:
|
||||
os.system("dpkg -i {}".format(code_server_deb))
|
||||
except Exception as ex:
|
||||
print("Failed installing vscode server: {}".format(ex))
|
||||
return
|
||||
@ -340,7 +368,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
# check if code-server exists
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
vscode_path = subprocess.check_output('which code-server', shell=True).decode().strip()
|
||||
vscode_path = shutil.which("code-server")
|
||||
assert vscode_path
|
||||
except Exception:
|
||||
print('Error: Cannot install code-server (not root) and could not find code-server executable, skipping.')
|
||||
@ -494,10 +522,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.
|
||||
|
||||
def start_jupyter_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.1", port=None):
|
||||
if not param.get('jupyterlab', True):
|
||||
print('no jupyterlab to monitor - going to sleep')
|
||||
while True:
|
||||
sleep(10.)
|
||||
return # noqa
|
||||
return
|
||||
|
||||
# execute jupyter notebook
|
||||
fd, local_filename = mkstemp()
|
||||
@ -618,13 +643,14 @@ def setup_ssh_server(hostname, hostnames, param, task, env):
|
||||
# noinspection SpellCheckingInspection
|
||||
os.system(
|
||||
"export PYTHONPATH=\"\" && "
|
||||
"([ ! -z $(which sshd) ] || "
|
||||
"(apt-get update ; DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server)) && "
|
||||
"([ ! -z $(command -v sshd) ] || "
|
||||
"(DEBIAN_FRONTEND=noninteractive apt-get update ; DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server) || "
|
||||
"(dnf install -y openssh-server)) && "
|
||||
"mkdir -p /var/run/sshd && "
|
||||
"echo 'root:{password}' | chpasswd && "
|
||||
"echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config && "
|
||||
"sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && "
|
||||
"sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd "
|
||||
"sed -i 's|PermitRootLogin prohibit-password|PermitRootLogin yes|g' /etc/ssh/sshd_config && "
|
||||
"sed -i 's|session\\s*required\\s*pam_loginuid.so|session optional pam_loginuid.so|g' /etc/pam.d/sshd "
|
||||
"&& " # noqa: W605
|
||||
"echo 'ClientAliveInterval 10' >> /etc/ssh/sshd_config && "
|
||||
"echo 'ClientAliveCountMax 20' >> /etc/ssh/sshd_config && "
|
||||
@ -633,10 +659,10 @@ def setup_ssh_server(hostname, hostnames, param, task, env):
|
||||
'echo "export VISIBLE=now" >> /etc/profile && '
|
||||
'echo "export PATH=$PATH" >> /etc/profile && '
|
||||
'echo "ldconfig" 2>/dev/null >> /etc/profile && '
|
||||
'echo "export CLEARML_CONFIG_FILE={trains_config_file}" >> /etc/profile'.format(
|
||||
'echo "export CLEARML_CONFIG_FILE={clearml_config_file}" >> /etc/profile'.format(
|
||||
password=ssh_password,
|
||||
port=port,
|
||||
trains_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"),
|
||||
clearml_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"),
|
||||
)
|
||||
)
|
||||
sshd_path = '/usr/sbin/sshd'
|
||||
@ -646,8 +672,10 @@ def setup_ssh_server(hostname, hostnames, param, task, env):
|
||||
# check if sshd exists
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
os.system('echo "export CLEARML_CONFIG_FILE={trains_config_file}" >> $HOME/.profile'.format(
|
||||
trains_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"),
|
||||
os.system(
|
||||
'echo "export PATH=$PATH" >> $HOME/.profile && '
|
||||
'echo "export CLEARML_CONFIG_FILE={clearml_config_file}" >> $HOME/.profile'.format(
|
||||
clearml_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"),
|
||||
))
|
||||
except Exception:
|
||||
print("warning failed setting ~/.profile")
|
||||
@ -1087,7 +1115,7 @@ def run_user_init_script(task):
|
||||
os.environ['CLEARML_DOCKER_BASH_SCRIPT'] = str(init_script)
|
||||
|
||||
|
||||
def _sync_workspace_snapshot(task, param):
|
||||
def _sync_workspace_snapshot(task, param, auto_shutdown_task):
|
||||
workspace_folder = param.get("store_workspace")
|
||||
if not workspace_folder:
|
||||
# nothing to do
|
||||
@ -1142,12 +1170,14 @@ def _sync_workspace_snapshot(task, param):
|
||||
archive_preview += '{} - {:,} B\n'.format(relative_file_name, filename.stat().st_size)
|
||||
|
||||
# upload actual snapshot tgz
|
||||
timestamp = datetime.datetime.now(datetime.UTC) \
|
||||
if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow()
|
||||
task.upload_artifact(
|
||||
name=artifact_workspace_name,
|
||||
artifact_object=Path(local_gzip),
|
||||
delete_after_upload=True,
|
||||
preview=archive_preview,
|
||||
metadata={"timestamp": str(datetime.utcnow()), sync_workspace_creating_id: task.id},
|
||||
metadata={"timestamp": str(timestamp), sync_workspace_creating_id: task.id},
|
||||
wait_on_upload=True,
|
||||
retries=3
|
||||
)
|
||||
@ -1185,26 +1215,27 @@ def _sync_workspace_snapshot(task, param):
|
||||
param["workspace_hash"] = workspace_hash
|
||||
# noinspection PyProtectedMember
|
||||
task._set_runtime_properties(runtime_properties={sync_runtime_property: time()})
|
||||
print("[{}] Workspace '{}' snapshot synced".format(datetime.utcnow(), workspace_folder))
|
||||
print("[{}] Workspace '{}' snapshot synced".format(timestamp, workspace_folder))
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed syncing workspace [{}]: {}".format(workspace_folder, ex))
|
||||
finally:
|
||||
if prev_status in ("failed", ):
|
||||
task.mark_failed(force=True, status_message="workspace shutdown sync completed")
|
||||
elif prev_status in ("completed", ):
|
||||
task.mark_completed(force=True, status_message="workspace shutdown sync completed")
|
||||
else:
|
||||
task.mark_stopped(force=True, status_message="workspace shutdown sync completed")
|
||||
if auto_shutdown_task:
|
||||
if prev_status in ("failed", ):
|
||||
task.mark_failed(force=True, status_message="workspace shutdown sync completed")
|
||||
elif prev_status in ("completed", ):
|
||||
task.mark_completed(force=True, status_message="workspace shutdown sync completed")
|
||||
else:
|
||||
task.mark_stopped(force=True, status_message="workspace shutdown sync completed")
|
||||
|
||||
|
||||
def sync_workspace_snapshot(task, param):
|
||||
def sync_workspace_snapshot(task, param, auto_shutdown_task=True):
|
||||
__poor_lock.append(time())
|
||||
if len(__poor_lock) != 1:
|
||||
# someone is already in, we should leave
|
||||
__poor_lock.pop(-1)
|
||||
|
||||
try:
|
||||
return _sync_workspace_snapshot(task, param)
|
||||
return _sync_workspace_snapshot(task, param, auto_shutdown_task=auto_shutdown_task)
|
||||
finally:
|
||||
__poor_lock.pop(-1)
|
||||
|
||||
@ -1261,7 +1292,358 @@ def restore_workspace(task, param):
|
||||
return workspace_folder
|
||||
|
||||
|
||||
def verify_workspace_storage_access(store_workspace, task):
|
||||
# notice this function will call EXIT if we do not have access rights to the output storage
|
||||
if not store_workspace:
|
||||
return True
|
||||
|
||||
# check that we have credentials to upload the artifact,
|
||||
try:
|
||||
original_output_uri = task.output_uri
|
||||
task.output_uri = task.output_uri or True
|
||||
task.output_uri = original_output_uri
|
||||
except ValueError as ex:
|
||||
print(
|
||||
"^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n"
|
||||
"^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n"
|
||||
"Error!\n"
|
||||
" `store_workspace` requested but target storage is not accessible!\n"
|
||||
" \n"
|
||||
" Storage configuration server error - could not store working session\n"
|
||||
" If you are using GS/Azure/S3 (or compatible) make sure to\n"
|
||||
" 1. Verify your credentials in the ClearML Vault\n"
|
||||
" 2. Add the relevant python package to the session:\n"
|
||||
" Azure: azure-storage-blob>=12.0.0\n"
|
||||
" GS: google-cloud-storage>=1.13.2\n"
|
||||
" S3: boto3>=1.9\n"
|
||||
"Exception:\n"
|
||||
f"{ex}\n"
|
||||
"*************************************************************************\n"
|
||||
"*************************************************************************\n"
|
||||
)
|
||||
# do not throw the exception itself, because it will confuse readers
|
||||
exit(1)
|
||||
|
||||
class SyncCallback:
|
||||
pipe_file_name_c = "/tmp/clearml_sync_pipe_c"
|
||||
pipe_file_name_r = "/tmp/clearml_sync_pipe_r"
|
||||
magic = "DEFAULT"
|
||||
cmd_file = "clearml-sync-workspace"
|
||||
_original_stdout_write = None
|
||||
_original_stderr_write = None
|
||||
ssh_banner = [
|
||||
"#!/bin/bash",
|
||||
"echo \"\"",
|
||||
"echo \"ClearML-Session:\"",
|
||||
"echo \" * Workspace at {workspace_dir} will be automatically synced at the end of the session, "
|
||||
"or manually by running '{clearml_sync_cmd}' command\"",
|
||||
"echo \" * Close session from the web UI or by running 'shutdown' command as root.\"",
|
||||
"echo \"\"",
|
||||
"",
|
||||
]
|
||||
singleton = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sync_function: callable = None,
|
||||
monitor_process: subprocess.Popen = None,
|
||||
workspace_dir: str = None
|
||||
):
|
||||
self.magic = str(uuid4())
|
||||
self._fd = None
|
||||
self._sync_func = sync_function
|
||||
self._monitor_process = monitor_process
|
||||
self._workspace_dir = workspace_dir
|
||||
SyncCallback.singleton = self
|
||||
|
||||
def init(self):
|
||||
try:
|
||||
if self._sync_func:
|
||||
self._create_sync_object()
|
||||
self._write_sync_cmd_file()
|
||||
|
||||
self._create_monitor_process()
|
||||
self._shutdown_cmd()
|
||||
|
||||
self._write_ssh_banner()
|
||||
except Exception as ex:
|
||||
print("Failed to create sync object: {}".format(ex))
|
||||
|
||||
def background_sync_thread(self) -> None:
|
||||
if not self._sync_func:
|
||||
return
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Open the pipe for reading
|
||||
with open(self.pipe_file_name_c, 'rt') as pipe:
|
||||
command = pipe.read().strip()
|
||||
if not command or command.split(":", 1)[0] != self.magic:
|
||||
continue
|
||||
command = command.split(":", 1)[-1]
|
||||
if not command:
|
||||
continue
|
||||
|
||||
print(f"Received command: {command}")
|
||||
if not os.path.exists(self.pipe_file_name_r):
|
||||
os.mkfifo(self.pipe_file_name_r, 0o644)
|
||||
|
||||
with open(self.pipe_file_name_r, 'wb') as pipe_out:
|
||||
self._fd = pipe_out
|
||||
# so that we push all our prints
|
||||
self._patch_stdout()
|
||||
|
||||
try:
|
||||
self._sync_func()
|
||||
except Exception as ex:
|
||||
print("WARNING: sync callback failed [{}]: {}".format(self._sync_func, ex))
|
||||
|
||||
pipe_out.write("\nEOF\n".encode())
|
||||
|
||||
# so that we push all our prints
|
||||
self._restore_stdout()
|
||||
|
||||
self._fd = None
|
||||
|
||||
except Exception as ex:
|
||||
self._restore_stdout()
|
||||
print("Exception occurred while waiting for sync request: {}".format(ex))
|
||||
print("Waiting for 60 seconds...")
|
||||
sleep(60)
|
||||
|
||||
# maybe we will get here
|
||||
os.remove(self.pipe_file_name_r)
|
||||
os.remove(self.pipe_file_name_c)
|
||||
|
||||
def wait_on_process(self, run_background_sync_thread=True, call_sync_callback_on_return=True):
|
||||
if not self._monitor_process:
|
||||
# if we do not have a process to wait just call the sync background
|
||||
if run_background_sync_thread:
|
||||
self.background_sync_thread()
|
||||
else:
|
||||
# start background thread
|
||||
if run_background_sync_thread:
|
||||
Thread(target=self.background_sync_thread, daemon=True).start()
|
||||
# wait on process
|
||||
self._monitor_process.wait()
|
||||
|
||||
if call_sync_callback_on_return and self._sync_func:
|
||||
self._sync_func()
|
||||
|
||||
def _create_sync_object(self) -> object:
|
||||
# Create the named pipe if it doesn't exist
|
||||
if not os.path.exists(self.pipe_file_name_c):
|
||||
os.mkfifo(self.pipe_file_name_c, 0o644)
|
||||
if not os.path.exists(self.pipe_file_name_r):
|
||||
os.mkfifo(self.pipe_file_name_r, 0o644)
|
||||
|
||||
def _create_monitor_process(self):
|
||||
if self._monitor_process:
|
||||
return
|
||||
sleep_cmd = shutil.which("sleep")
|
||||
self._monitor_process = subprocess.Popen([sleep_cmd, "999d"], shell=False)
|
||||
|
||||
def _write_sync_cmd_file(self):
|
||||
import inspect
|
||||
source_function = inspect.getsource(_sync_cmd_function)
|
||||
source_function = "#!{}\n\n".format(sys.executable) + source_function
|
||||
source_function = source_function.replace("SyncCallback.pipe_file_name_c",
|
||||
"\"{}\"".format(self.pipe_file_name_c))
|
||||
source_function = source_function.replace("SyncCallback.pipe_file_name_r",
|
||||
"\"{}\"".format(self.pipe_file_name_r))
|
||||
source_function = source_function.replace("SyncCallback.magic", "\"{}\"".format(self.magic))
|
||||
source_function += "\nif __name__ == \"__main__\":\n {}()\n".format("_sync_cmd_function")
|
||||
# print("source_function:\n```\n{}\n```".format(source_function))
|
||||
|
||||
full_path = None
|
||||
for p in os.environ.get("PATH", "/usr/bin").split(os.pathsep):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
if not Path(p).is_dir():
|
||||
continue
|
||||
full_path = Path(p) / self.cmd_file
|
||||
full_path.touch(exist_ok=True)
|
||||
break
|
||||
except Exception as ex:
|
||||
pass
|
||||
|
||||
if not full_path:
|
||||
print("ERROR: Failed to create sync execution cmd")
|
||||
return
|
||||
|
||||
print("Creating sync command in: {}".format(full_path))
|
||||
|
||||
try:
|
||||
with open(full_path, "wt") as f:
|
||||
f.write(source_function)
|
||||
os.chmod(full_path, 0o777)
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed to create sync execution cmd {}: {}".format(full_path, ex))
|
||||
|
||||
def _write_ssh_banner(self):
|
||||
banner_file = Path("/etc/update-motd.d/")
|
||||
make_exec = False
|
||||
if banner_file.is_dir():
|
||||
banner_file = banner_file / "99-clearml"
|
||||
make_exec = True
|
||||
else:
|
||||
banner_file = Path("/etc/profile").expanduser()
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
banner_file.touch(exist_ok=True)
|
||||
except Exception:
|
||||
banner_file = Path("~/.profile").expanduser()
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
banner_file.touch(exist_ok=True)
|
||||
except Exception:
|
||||
print("WARNING: failed creating ssh banner")
|
||||
return
|
||||
|
||||
try:
|
||||
with open(banner_file, "at") as f:
|
||||
ssh_banner = self.ssh_banner
|
||||
|
||||
# skip first `#!/bin/bash` line if this is not an executable, and add a new line instead
|
||||
if not make_exec:
|
||||
ssh_banner = [""] + ssh_banner[1:]
|
||||
|
||||
f.write("\n".join(ssh_banner).format(
|
||||
workspace_dir=self._workspace_dir, clearml_sync_cmd=self.cmd_file
|
||||
))
|
||||
|
||||
if make_exec:
|
||||
os.chmod(banner_file.as_posix(), 0o755)
|
||||
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed to write to banner {}: {}".format(banner_file, ex))
|
||||
|
||||
def _shutdown_cmd(self):
|
||||
batch_command = [
|
||||
"#!/bin/bash",
|
||||
"[ \"$UID\" -ne 0 ] && echo \"shutdown: Permission denied. Try as root.\" && exit 1",
|
||||
"[ ! -f /run/clearml ] && echo \"shutdown: failed.\" && exit 2",
|
||||
"kill -9 $(cat /run/clearml 2>/dev/null) && echo \"system is now spinning down - "
|
||||
"it might take a minute if we need to upload the workspace:\""
|
||||
" && for ((i=180; i>=0; i--)); do echo -n \" .\"; sleep 1; done",
|
||||
""
|
||||
]
|
||||
|
||||
if not self._monitor_process:
|
||||
return
|
||||
|
||||
shutdown_cmd_directory = None
|
||||
for p in os.environ.get("PATH", "/usr/bin").split(os.pathsep):
|
||||
# default is first folder
|
||||
if not shutdown_cmd_directory:
|
||||
shutdown_cmd_directory = Path(p)
|
||||
if not shutdown_cmd_directory.is_dir():
|
||||
shutdown_cmd_directory = None
|
||||
continue
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
(Path(p) / "shutdown").unlink()
|
||||
# the first we found is enough, we will use it
|
||||
shutdown_cmd_directory = Path(p)
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
with open("/run/clearml", "wt") as f:
|
||||
f.write("{}".format(self._monitor_process.pid))
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed to write to run pid: {}".format(ex))
|
||||
|
||||
try:
|
||||
shutdown_cmd = shutdown_cmd_directory / "shutdown"
|
||||
with open(shutdown_cmd, "wt") as f:
|
||||
f.write("\n".join(batch_command))
|
||||
os.chmod(shutdown_cmd.as_posix(), 0o755)
|
||||
except Exception as ex:
|
||||
print("WARNING: Failed to write to shutdown cmd {}: {}".format(shutdown_cmd, ex))
|
||||
|
||||
def _stdout__patched__write__(self, is_stderr, *args, **kwargs):
|
||||
write_func = self._original_stderr_write if is_stderr else self._original_stdout_write
|
||||
ret = write_func(*args, **kwargs) # noqa
|
||||
|
||||
if self._fd:
|
||||
message = args[0] if len(args) > 0 else None
|
||||
if message is not None:
|
||||
try:
|
||||
message = str(message)
|
||||
if "\n" not in message:
|
||||
message = message + "NEOL\n"
|
||||
self._fd.write(message.encode())
|
||||
self._fd.flush()
|
||||
except Exception as ex:
|
||||
self._original_stderr_write("WARNING: failed sending stdout over pipe: {}\n".format(ex))
|
||||
|
||||
return ret
|
||||
|
||||
def _patch_stdout(self):
|
||||
self._original_stdout_write = sys.stdout.write
|
||||
self._original_stderr_write = sys.stderr.write
|
||||
sys.stdout.write = partial(self._stdout__patched__write__, False,)
|
||||
sys.stderr.write = partial(self._stdout__patched__write__, True,)
|
||||
|
||||
def _restore_stdout(self):
|
||||
sys.stdout.write = self._original_stdout_write
|
||||
sys.stderr.write = self._original_stderr_write
|
||||
|
||||
|
||||
def _sync_cmd_function():
|
||||
# this is where we put all the imports and the sync call back
|
||||
import os
|
||||
import sys
|
||||
print("Storing workspace to persistent storage")
|
||||
try:
|
||||
if not os.path.exists(SyncCallback.pipe_file_name_c):
|
||||
os.mkfifo(SyncCallback.pipe_file_name_c, 0o644)
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed creating request pipe {}".format(ex))
|
||||
|
||||
# push the request
|
||||
try:
|
||||
with open(SyncCallback.pipe_file_name_c, 'wt') as pipe:
|
||||
cmd = "{}:sync".format(SyncCallback.magic)
|
||||
pipe.write(cmd)
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed sending sync request {}".format(ex))
|
||||
|
||||
# while we did not get EOF
|
||||
try:
|
||||
# Read the result from the server
|
||||
with open(SyncCallback.pipe_file_name_r, 'rb') as pipe:
|
||||
while True:
|
||||
result = pipe.readline().decode()
|
||||
|
||||
if result.endswith("NEOL\n"):
|
||||
result = result[:-5]
|
||||
|
||||
# read from fd
|
||||
if "EOF" in result.split("\n"):
|
||||
sys.stdout.write(result.replace("EOF\n", "\n"))
|
||||
print("Workspace synced successfully")
|
||||
break
|
||||
sys.stdout.write(result)
|
||||
|
||||
except Exception as ex:
|
||||
print("ERROR: Failed reading sync request result {}".format(ex))
|
||||
|
||||
|
||||
def main():
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
Task.set_resource_monitor_iteration_timeout(
|
||||
seconds_from_start=1,
|
||||
wait_for_first_iteration_to_start_sec=1, # noqa
|
||||
max_wait_for_first_iteration_to_start_sec=1 # noqa
|
||||
) # noqa
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
param = {
|
||||
"user_base_directory": "~/",
|
||||
"ssh_server": True,
|
||||
@ -1289,6 +1671,9 @@ def main():
|
||||
|
||||
run_user_init_script(task)
|
||||
|
||||
# notice this function will call EXIT if we do not have access rights to the output storage
|
||||
verify_workspace_storage_access(store_workspace=param.get("store_workspace"), task=task)
|
||||
|
||||
# restore workspace if exists
|
||||
# notice, if "store_workspace" is not set we will Not restore the workspace
|
||||
try:
|
||||
@ -1321,11 +1706,22 @@ def main():
|
||||
|
||||
start_vscode_server(hostname, hostnames, param, task, env)
|
||||
|
||||
start_jupyter_server(hostname, hostnames, param, task, env)
|
||||
# Notice we can only monitor the jupyter server because the vscode/ssh do not have "quit" interface
|
||||
# we add `shutdown` command below
|
||||
jupyter_process = start_jupyter_server(hostname, hostnames, param, task, env)
|
||||
|
||||
print('We are done - sync workspace if needed')
|
||||
syncer = SyncCallback(
|
||||
sync_function=partial(sync_workspace_snapshot, task, param, False),
|
||||
monitor_process=jupyter_process,
|
||||
workspace_dir=param.get("store_workspace")
|
||||
)
|
||||
syncer.init()
|
||||
|
||||
sync_workspace_snapshot(task, param)
|
||||
# notice this will end when process is done
|
||||
syncer.wait_on_process(run_background_sync_thread=True, call_sync_callback_on_return=True)
|
||||
|
||||
print('We are done')
|
||||
# no need to sync the process, syncer.wait_on_process already did that
|
||||
|
||||
# sync back python packages for next time
|
||||
# TODO: sync python environment
|
||||
|
Loading…
Reference in New Issue
Block a user