diff --git a/clearml_session/__main__.py b/clearml_session/__main__.py index 330d3f2..631d3fb 100644 --- a/clearml_session/__main__.py +++ b/clearml_session/__main__.py @@ -189,16 +189,21 @@ def create_base_task(state, project_name=None, task_name=None, continue_task_id= task_script = task.data.script.to_dict() base_script_file = os.path.abspath(os.path.join(__file__, '..', 'tcp_proxy.py')) with open(base_script_file, 'rt') as f: - task_script['diff'] = f.read() + # notice lines always end with \n + task_script['diff'] = "".join([line for line in f.readlines() if not line.lstrip().startswith("#")]) base_script_file = os.path.abspath(os.path.join(__file__, '..', 'interactive_session_task.py')) with open(base_script_file, 'rt') as f: - task_script['diff'] += '\n\n' + f.read() + task_script['diff'] += "\n\n" + # notice lines always end with \n + task_script['diff'] += "".join([line for line in f.readlines() if not line.lstrip().startswith("#")]) task_script['working_dir'] = '.' task_script['entry_point'] = '.interactive_session.py' task_script['requirements'] = {'pip': '\n'.join( ["clearml>=1.1.5"] + (["jupyter", "jupyterlab", "jupyterlab_git", "traitlets"] if state.get('jupyter_lab') else []) + + (["boto3>=1.9", "azure-storage-blob>=12.0.0", "google-cloud-storage>=1.13.2"] + if not state.get('disable_storage_packages') else []) + \ (['pylint'] if state.get('vscode_server') else []))} section, _, _ = _get_config_section_name() @@ -267,7 +272,7 @@ def create_debugging_task(state, debug_task_id, task_name=None, task_project_id= base_script_file = os.path.abspath(os.path.join(__file__, '..', 'interactive_session_task.py')) with open(base_script_file, 'rt') as f: - entry_diff = ['+'+line.rstrip() for line in f.readlines()] + entry_diff = ['+'+line.rstrip() for line in f.readlines() if not line.lstrip().startswith("#")] entry_diff_header = \ "diff --git a/__interactive_session__.py b/__interactive_session__.py\n" \ "--- a/__interactive_session__.py\n" \ @@ -281,6 +286,8 @@ def create_debugging_task(state, debug_task_id, task_name=None, task_project_id= state['packages'] = \ (state.get('packages') or []) + ["clearml"] + \ (["jupyter", "jupyterlab", "jupyterlab_git", "traitlets"] if state.get('jupyter_lab') else []) + \ + (["boto3>=1.9", "azure-storage-blob>=12.0.0", "google-cloud-storage>=1.13.2"] + if not state.get('disable_storage_packages') else []) + \ (['pylint'] if state.get('vscode_server') else []) task.update_task(task_state) section, _, _ = _get_config_section_name() @@ -420,9 +427,10 @@ def _get_running_tasks(client, prev_task_id): 'page_size': 10, 'page': 0, 'order_by': ['-last_update'], 'user': [current_user_id], - 'only_fields': ['id', 'created', 'parent'] + 'only_fields': ['id', 'created', 'parent', 'status_message'] }) - tasks_id_created = [(t.id, t.created, t.parent) for t in previous_tasks] + tasks_id_created = [(t.id, t.created, t.parent) for t in previous_tasks + if "stopping" not in (t.status_message or "")] if prev_task_id and prev_task_id not in (t[0] for t in tasks_id_created): # manually check the last task.id try: @@ -431,13 +439,13 @@ def _get_running_tasks(client, prev_task_id): 'id': [prev_task_id], 'page_size': 10, 'page': 0, 'order_by': ['-last_update'], - 'only_fields': ['id', 'created', 'parent'] + 'only_fields': ['id', 'created', 'parent', 'status_message'] }) except APIError: # we could not find previous task, nothing to worry about. prev_tasks = None - if prev_tasks: + if prev_tasks and "stopping" not in (prev_tasks[0].status_message or ""): tasks_id_created += [(prev_tasks[0].id, prev_tasks[0].created, prev_tasks[0].parent)] return tasks_id_created @@ -1462,6 +1470,10 @@ def setup_parser(parser): parser.add_argument('--force-dropbear', default=None, nargs='?', const='true', metavar='true/false', type=lambda x: (str(x).strip().lower() in ('true', 'yes')), help='Force using `dropbear` instead of SSHd') + parser.add_argument('--disable-storage-packages', default=None, nargs='?', const='true', metavar='true/false', + type=lambda x: (str(x).strip().lower() in ('true', 'yes')), + help='If True automatic boto3/azure-storage-blob/google-cloud-storage python ' + 'packages will not be added, you can manually add them using --packages') parser.add_argument('--disable-store-defaults', action='store_true', default=None, help='If set, do not store current setup as new default configuration') parser.add_argument('--disable-fingerprint-check', action='store_true', default=None, diff --git a/clearml_session/interactive_session_task.py b/clearml_session/interactive_session_task.py index d7e7249..d99ba43 100644 --- a/clearml_session/interactive_session_task.py +++ b/clearml_session/interactive_session_task.py @@ -9,8 +9,10 @@ from copy import deepcopy import getpass from functools import partial from tempfile import mkstemp, gettempdir, mkdtemp +from threading import Thread from time import sleep, time -from datetime import datetime +import datetime +from uuid import uuid4 import psutil import requests @@ -261,18 +263,23 @@ def monitor_jupyter_server(fd, local_filename, process, task, jupyter_port, host task.set_parameter(name='properties/jupyter_url', value=jupyter_url) print('\nJupyter Lab URL: {}\n'.format(jupyter_url)) + # if we got here, we have a token and we can leave + break + # skip cleanup the process is still running # cleanup - # noinspection PyBroadException - try: - os.close(fd) - except Exception: - pass - # noinspection PyBroadException - try: - os.unlink(local_filename) - except Exception: - pass + # # noinspection PyBroadException + # try: + # os.close(fd) + # except Exception: + # pass + # # noinspection PyBroadException + # try: + # os.unlink(local_filename) + # except Exception: + # pass + + return process def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.1", port=None): @@ -281,8 +288,8 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. # get vscode version and python extension version # they are extremely flaky, this combination works, most do not. - vscode_version = '4.96.2' - python_ext_version = '2024.22.1' + vscode_version = '4.99.2' + python_ext_version = '2025.4.0' if param.get("vscode_version"): vscode_version_parts = param.get("vscode_version").split(':') vscode_version = vscode_version_parts[0] @@ -301,9 +308,27 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. # example of CLEARML_SESSION_VSCODE_SERVER_DEB value # 'https://github.com/coder/code-server/releases/download/v4.96.2/code-server_4.96.2_amd64.deb' # (see https://github.com/coder/code-server/releases) + code_server_deb_download_link = \ os.environ.get("CLEARML_SESSION_VSCODE_SERVER_DEB") or \ - 'https://github.com/coder/code-server/releases/download/v{version}/code-server_{version}_amd64.deb' + 'https://github.com/coder/code-server/releases/download/v{version}/code-server_{version}' + + # support x86/arm dnf/deb + if not code_server_deb_download_link.endswith(".deb") and not code_server_deb_download_link.endswith(".rpm"): + import platform + if "arm" in (platform.processor() or "").lower(): + code_server_deb_download_link += "_arm64" + else: + code_server_deb_download_link += "_amd64" + + if shutil.which("dnf") and not shutil.which("apt"): + code_server_deb_download_link += ".rpm" + parts = code_server_deb_download_link.split("/") + # rpm links with "-" instead of "_" in the rpm filename + parts[-1] = parts[-1].replace("_", "-") + code_server_deb_download_link = "/".join(parts) + else: + code_server_deb_download_link += ".deb" pre_installed = False python_ext = None @@ -315,7 +340,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. # check if preinstalled # noinspection PyBroadException try: - vscode_path = subprocess.check_output('which code-server', shell=True).decode().strip() + vscode_path = shutil.which("code-server") pre_installed = bool(vscode_path) except Exception: vscode_path = None @@ -329,7 +354,10 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. code_server_deb = StorageManager.get_local_copy( code_server_deb_download_link.format(version=vscode_version), extract_archive=False) - os.system("dpkg -i {}".format(code_server_deb)) + if shutil.which("dnf") and not shutil.which("dpkg"): + os.system("dnf install -y {}".format(code_server_deb)) + else: + os.system("dpkg -i {}".format(code_server_deb)) except Exception as ex: print("Failed installing vscode server: {}".format(ex)) return @@ -340,7 +368,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. # check if code-server exists # noinspection PyBroadException try: - vscode_path = subprocess.check_output('which code-server', shell=True).decode().strip() + vscode_path = shutil.which("code-server") assert vscode_path except Exception: print('Error: Cannot install code-server (not root) and could not find code-server executable, skipping.') @@ -494,10 +522,7 @@ def start_vscode_server(hostname, hostnames, param, task, env, bind_ip="127.0.0. def start_jupyter_server(hostname, hostnames, param, task, env, bind_ip="127.0.0.1", port=None): if not param.get('jupyterlab', True): - print('no jupyterlab to monitor - going to sleep') - while True: - sleep(10.) - return # noqa + return # execute jupyter notebook fd, local_filename = mkstemp() @@ -618,13 +643,14 @@ def setup_ssh_server(hostname, hostnames, param, task, env): # noinspection SpellCheckingInspection os.system( "export PYTHONPATH=\"\" && " - "([ ! -z $(which sshd) ] || " - "(apt-get update ; DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server)) && " + "([ ! -z $(command -v sshd) ] || " + "(DEBIAN_FRONTEND=noninteractive apt-get update ; DEBIAN_FRONTEND=noninteractive apt-get install -y openssh-server) || " + "(dnf install -y openssh-server)) && " "mkdir -p /var/run/sshd && " "echo 'root:{password}' | chpasswd && " "echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config && " - "sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config && " - "sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd " + "sed -i 's|PermitRootLogin prohibit-password|PermitRootLogin yes|g' /etc/ssh/sshd_config && " + "sed -i 's|session\\s*required\\s*pam_loginuid.so|session optional pam_loginuid.so|g' /etc/pam.d/sshd " "&& " # noqa: W605 "echo 'ClientAliveInterval 10' >> /etc/ssh/sshd_config && " "echo 'ClientAliveCountMax 20' >> /etc/ssh/sshd_config && " @@ -633,10 +659,10 @@ def setup_ssh_server(hostname, hostnames, param, task, env): 'echo "export VISIBLE=now" >> /etc/profile && ' 'echo "export PATH=$PATH" >> /etc/profile && ' 'echo "ldconfig" 2>/dev/null >> /etc/profile && ' - 'echo "export CLEARML_CONFIG_FILE={trains_config_file}" >> /etc/profile'.format( + 'echo "export CLEARML_CONFIG_FILE={clearml_config_file}" >> /etc/profile'.format( password=ssh_password, port=port, - trains_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"), + clearml_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"), ) ) sshd_path = '/usr/sbin/sshd' @@ -646,8 +672,10 @@ def setup_ssh_server(hostname, hostnames, param, task, env): # check if sshd exists # noinspection PyBroadException try: - os.system('echo "export CLEARML_CONFIG_FILE={trains_config_file}" >> $HOME/.profile'.format( - trains_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"), + os.system( + 'echo "export PATH=$PATH" >> $HOME/.profile && ' + 'echo "export CLEARML_CONFIG_FILE={clearml_config_file}" >> $HOME/.profile'.format( + clearml_config_file=os.environ.get("CLEARML_CONFIG_FILE") or os.environ.get("TRAINS_CONFIG_FILE"), )) except Exception: print("warning failed setting ~/.profile") @@ -1087,7 +1115,7 @@ def run_user_init_script(task): os.environ['CLEARML_DOCKER_BASH_SCRIPT'] = str(init_script) -def _sync_workspace_snapshot(task, param): +def _sync_workspace_snapshot(task, param, auto_shutdown_task): workspace_folder = param.get("store_workspace") if not workspace_folder: # nothing to do @@ -1142,12 +1170,14 @@ def _sync_workspace_snapshot(task, param): archive_preview += '{} - {:,} B\n'.format(relative_file_name, filename.stat().st_size) # upload actual snapshot tgz + timestamp = datetime.datetime.now(datetime.UTC) \ + if hasattr(datetime, "UTC") and hasattr(datetime.datetime, "now") else datetime.datetime.utcnow() task.upload_artifact( name=artifact_workspace_name, artifact_object=Path(local_gzip), delete_after_upload=True, preview=archive_preview, - metadata={"timestamp": str(datetime.utcnow()), sync_workspace_creating_id: task.id}, + metadata={"timestamp": str(timestamp), sync_workspace_creating_id: task.id}, wait_on_upload=True, retries=3 ) @@ -1185,26 +1215,27 @@ def _sync_workspace_snapshot(task, param): param["workspace_hash"] = workspace_hash # noinspection PyProtectedMember task._set_runtime_properties(runtime_properties={sync_runtime_property: time()}) - print("[{}] Workspace '{}' snapshot synced".format(datetime.utcnow(), workspace_folder)) + print("[{}] Workspace '{}' snapshot synced".format(timestamp, workspace_folder)) except Exception as ex: print("ERROR: Failed syncing workspace [{}]: {}".format(workspace_folder, ex)) finally: - if prev_status in ("failed", ): - task.mark_failed(force=True, status_message="workspace shutdown sync completed") - elif prev_status in ("completed", ): - task.mark_completed(force=True, status_message="workspace shutdown sync completed") - else: - task.mark_stopped(force=True, status_message="workspace shutdown sync completed") + if auto_shutdown_task: + if prev_status in ("failed", ): + task.mark_failed(force=True, status_message="workspace shutdown sync completed") + elif prev_status in ("completed", ): + task.mark_completed(force=True, status_message="workspace shutdown sync completed") + else: + task.mark_stopped(force=True, status_message="workspace shutdown sync completed") -def sync_workspace_snapshot(task, param): +def sync_workspace_snapshot(task, param, auto_shutdown_task=True): __poor_lock.append(time()) if len(__poor_lock) != 1: # someone is already in, we should leave __poor_lock.pop(-1) try: - return _sync_workspace_snapshot(task, param) + return _sync_workspace_snapshot(task, param, auto_shutdown_task=auto_shutdown_task) finally: __poor_lock.pop(-1) @@ -1261,7 +1292,358 @@ def restore_workspace(task, param): return workspace_folder +def verify_workspace_storage_access(store_workspace, task): + # notice this function will call EXIT if we do not have access rights to the output storage + if not store_workspace: + return True + + # check that we have credentials to upload the artifact, + try: + original_output_uri = task.output_uri + task.output_uri = task.output_uri or True + task.output_uri = original_output_uri + except ValueError as ex: + print( + "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" + "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n" + "Error!\n" + " `store_workspace` requested but target storage is not accessible!\n" + " \n" + " Storage configuration server error - could not store working session\n" + " If you are using GS/Azure/S3 (or compatible) make sure to\n" + " 1. Verify your credentials in the ClearML Vault\n" + " 2. Add the relevant python package to the session:\n" + " Azure: azure-storage-blob>=12.0.0\n" + " GS: google-cloud-storage>=1.13.2\n" + " S3: boto3>=1.9\n" + "Exception:\n" + f"{ex}\n" + "*************************************************************************\n" + "*************************************************************************\n" + ) + # do not throw the exception itself, because it will confuse readers + exit(1) + +class SyncCallback: + pipe_file_name_c = "/tmp/clearml_sync_pipe_c" + pipe_file_name_r = "/tmp/clearml_sync_pipe_r" + magic = "DEFAULT" + cmd_file = "clearml-sync-workspace" + _original_stdout_write = None + _original_stderr_write = None + ssh_banner = [ + "#!/bin/bash", + "echo \"\"", + "echo \"ClearML-Session:\"", + "echo \" * Workspace at {workspace_dir} will be automatically synced at the end of the session, " + "or manually by running '{clearml_sync_cmd}' command\"", + "echo \" * Close session from the web UI or by running 'shutdown' command as root.\"", + "echo \"\"", + "", + ] + singleton = None + + def __init__( + self, + sync_function: callable = None, + monitor_process: subprocess.Popen = None, + workspace_dir: str = None + ): + self.magic = str(uuid4()) + self._fd = None + self._sync_func = sync_function + self._monitor_process = monitor_process + self._workspace_dir = workspace_dir + SyncCallback.singleton = self + + def init(self): + try: + if self._sync_func: + self._create_sync_object() + self._write_sync_cmd_file() + + self._create_monitor_process() + self._shutdown_cmd() + + self._write_ssh_banner() + except Exception as ex: + print("Failed to create sync object: {}".format(ex)) + + def background_sync_thread(self) -> None: + if not self._sync_func: + return + + while True: + try: + # Open the pipe for reading + with open(self.pipe_file_name_c, 'rt') as pipe: + command = pipe.read().strip() + if not command or command.split(":", 1)[0] != self.magic: + continue + command = command.split(":", 1)[-1] + if not command: + continue + + print(f"Received command: {command}") + if not os.path.exists(self.pipe_file_name_r): + os.mkfifo(self.pipe_file_name_r, 0o644) + + with open(self.pipe_file_name_r, 'wb') as pipe_out: + self._fd = pipe_out + # so that we push all our prints + self._patch_stdout() + + try: + self._sync_func() + except Exception as ex: + print("WARNING: sync callback failed [{}]: {}".format(self._sync_func, ex)) + + pipe_out.write("\nEOF\n".encode()) + + # so that we push all our prints + self._restore_stdout() + + self._fd = None + + except Exception as ex: + self._restore_stdout() + print("Exception occurred while waiting for sync request: {}".format(ex)) + print("Waiting for 60 seconds...") + sleep(60) + + # maybe we will get here + os.remove(self.pipe_file_name_r) + os.remove(self.pipe_file_name_c) + + def wait_on_process(self, run_background_sync_thread=True, call_sync_callback_on_return=True): + if not self._monitor_process: + # if we do not have a process to wait just call the sync background + if run_background_sync_thread: + self.background_sync_thread() + else: + # start background thread + if run_background_sync_thread: + Thread(target=self.background_sync_thread, daemon=True).start() + # wait on process + self._monitor_process.wait() + + if call_sync_callback_on_return and self._sync_func: + self._sync_func() + + def _create_sync_object(self) -> object: + # Create the named pipe if it doesn't exist + if not os.path.exists(self.pipe_file_name_c): + os.mkfifo(self.pipe_file_name_c, 0o644) + if not os.path.exists(self.pipe_file_name_r): + os.mkfifo(self.pipe_file_name_r, 0o644) + + def _create_monitor_process(self): + if self._monitor_process: + return + sleep_cmd = shutil.which("sleep") + self._monitor_process = subprocess.Popen([sleep_cmd, "999d"], shell=False) + + def _write_sync_cmd_file(self): + import inspect + source_function = inspect.getsource(_sync_cmd_function) + source_function = "#!{}\n\n".format(sys.executable) + source_function + source_function = source_function.replace("SyncCallback.pipe_file_name_c", + "\"{}\"".format(self.pipe_file_name_c)) + source_function = source_function.replace("SyncCallback.pipe_file_name_r", + "\"{}\"".format(self.pipe_file_name_r)) + source_function = source_function.replace("SyncCallback.magic", "\"{}\"".format(self.magic)) + source_function += "\nif __name__ == \"__main__\":\n {}()\n".format("_sync_cmd_function") + # print("source_function:\n```\n{}\n```".format(source_function)) + + full_path = None + for p in os.environ.get("PATH", "/usr/bin").split(os.pathsep): + # noinspection PyBroadException + try: + if not Path(p).is_dir(): + continue + full_path = Path(p) / self.cmd_file + full_path.touch(exist_ok=True) + break + except Exception as ex: + pass + + if not full_path: + print("ERROR: Failed to create sync execution cmd") + return + + print("Creating sync command in: {}".format(full_path)) + + try: + with open(full_path, "wt") as f: + f.write(source_function) + os.chmod(full_path, 0o777) + except Exception as ex: + print("ERROR: Failed to create sync execution cmd {}: {}".format(full_path, ex)) + + def _write_ssh_banner(self): + banner_file = Path("/etc/update-motd.d/") + make_exec = False + if banner_file.is_dir(): + banner_file = banner_file / "99-clearml" + make_exec = True + else: + banner_file = Path("/etc/profile").expanduser() + + # noinspection PyBroadException + try: + banner_file.touch(exist_ok=True) + except Exception: + banner_file = Path("~/.profile").expanduser() + # noinspection PyBroadException + try: + banner_file.touch(exist_ok=True) + except Exception: + print("WARNING: failed creating ssh banner") + return + + try: + with open(banner_file, "at") as f: + ssh_banner = self.ssh_banner + + # skip first `#!/bin/bash` line if this is not an executable, and add a new line instead + if not make_exec: + ssh_banner = [""] + ssh_banner[1:] + + f.write("\n".join(ssh_banner).format( + workspace_dir=self._workspace_dir, clearml_sync_cmd=self.cmd_file + )) + + if make_exec: + os.chmod(banner_file.as_posix(), 0o755) + + except Exception as ex: + print("WARNING: Failed to write to banner {}: {}".format(banner_file, ex)) + + def _shutdown_cmd(self): + batch_command = [ + "#!/bin/bash", + "[ \"$UID\" -ne 0 ] && echo \"shutdown: Permission denied. Try as root.\" && exit 1", + "[ ! -f /run/clearml ] && echo \"shutdown: failed.\" && exit 2", + "kill -9 $(cat /run/clearml 2>/dev/null) && echo \"system is now spinning down - " + "it might take a minute if we need to upload the workspace:\"" + " && for ((i=180; i>=0; i--)); do echo -n \" .\"; sleep 1; done", + "" + ] + + if not self._monitor_process: + return + + shutdown_cmd_directory = None + for p in os.environ.get("PATH", "/usr/bin").split(os.pathsep): + # default is first folder + if not shutdown_cmd_directory: + shutdown_cmd_directory = Path(p) + if not shutdown_cmd_directory.is_dir(): + shutdown_cmd_directory = None + continue + # noinspection PyBroadException + try: + (Path(p) / "shutdown").unlink() + # the first we found is enough, we will use it + shutdown_cmd_directory = Path(p) + break + except Exception: + pass + + try: + with open("/run/clearml", "wt") as f: + f.write("{}".format(self._monitor_process.pid)) + except Exception as ex: + print("WARNING: Failed to write to run pid: {}".format(ex)) + + try: + shutdown_cmd = shutdown_cmd_directory / "shutdown" + with open(shutdown_cmd, "wt") as f: + f.write("\n".join(batch_command)) + os.chmod(shutdown_cmd.as_posix(), 0o755) + except Exception as ex: + print("WARNING: Failed to write to shutdown cmd {}: {}".format(shutdown_cmd, ex)) + + def _stdout__patched__write__(self, is_stderr, *args, **kwargs): + write_func = self._original_stderr_write if is_stderr else self._original_stdout_write + ret = write_func(*args, **kwargs) # noqa + + if self._fd: + message = args[0] if len(args) > 0 else None + if message is not None: + try: + message = str(message) + if "\n" not in message: + message = message + "NEOL\n" + self._fd.write(message.encode()) + self._fd.flush() + except Exception as ex: + self._original_stderr_write("WARNING: failed sending stdout over pipe: {}\n".format(ex)) + + return ret + + def _patch_stdout(self): + self._original_stdout_write = sys.stdout.write + self._original_stderr_write = sys.stderr.write + sys.stdout.write = partial(self._stdout__patched__write__, False,) + sys.stderr.write = partial(self._stdout__patched__write__, True,) + + def _restore_stdout(self): + sys.stdout.write = self._original_stdout_write + sys.stderr.write = self._original_stderr_write + + +def _sync_cmd_function(): + # this is where we put all the imports and the sync call back + import os + import sys + print("Storing workspace to persistent storage") + try: + if not os.path.exists(SyncCallback.pipe_file_name_c): + os.mkfifo(SyncCallback.pipe_file_name_c, 0o644) + except Exception as ex: + print("ERROR: Failed creating request pipe {}".format(ex)) + + # push the request + try: + with open(SyncCallback.pipe_file_name_c, 'wt') as pipe: + cmd = "{}:sync".format(SyncCallback.magic) + pipe.write(cmd) + except Exception as ex: + print("ERROR: Failed sending sync request {}".format(ex)) + + # while we did not get EOF + try: + # Read the result from the server + with open(SyncCallback.pipe_file_name_r, 'rb') as pipe: + while True: + result = pipe.readline().decode() + + if result.endswith("NEOL\n"): + result = result[:-5] + + # read from fd + if "EOF" in result.split("\n"): + sys.stdout.write(result.replace("EOF\n", "\n")) + print("Workspace synced successfully") + break + sys.stdout.write(result) + + except Exception as ex: + print("ERROR: Failed reading sync request result {}".format(ex)) + + def main(): + # noinspection PyBroadException + try: + Task.set_resource_monitor_iteration_timeout( + seconds_from_start=1, + wait_for_first_iteration_to_start_sec=1, # noqa + max_wait_for_first_iteration_to_start_sec=1 # noqa + ) # noqa + except Exception: + pass + param = { "user_base_directory": "~/", "ssh_server": True, @@ -1289,6 +1671,9 @@ def main(): run_user_init_script(task) + # notice this function will call EXIT if we do not have access rights to the output storage + verify_workspace_storage_access(store_workspace=param.get("store_workspace"), task=task) + # restore workspace if exists # notice, if "store_workspace" is not set we will Not restore the workspace try: @@ -1321,11 +1706,22 @@ def main(): start_vscode_server(hostname, hostnames, param, task, env) - start_jupyter_server(hostname, hostnames, param, task, env) + # Notice we can only monitor the jupyter server because the vscode/ssh do not have "quit" interface + # we add `shutdown` command below + jupyter_process = start_jupyter_server(hostname, hostnames, param, task, env) - print('We are done - sync workspace if needed') + syncer = SyncCallback( + sync_function=partial(sync_workspace_snapshot, task, param, False), + monitor_process=jupyter_process, + workspace_dir=param.get("store_workspace") + ) + syncer.init() - sync_workspace_snapshot(task, param) + # notice this will end when process is done + syncer.wait_on_process(run_background_sync_thread=True, call_sync_callback_on_return=True) + + print('We are done') + # no need to sync the process, syncer.wait_on_process already did that # sync back python packages for next time # TODO: sync python environment