From a621b4fa20020d97e939f7c4b9f84a0a621d7d34 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 22 Dec 2022 22:11:43 +0200 Subject: [PATCH] Add pipeline decorator argument to control docker image (#856) --- clearml/automation/controller.py | 106 +++++++++++++++++++++++-- clearml/backend_interface/task/task.py | 50 +++++++++++- clearml/datasets/dataset.py | 4 +- clearml/task.py | 35 ++++++++ 4 files changed, 186 insertions(+), 9 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index cedbbf75..a8445753 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -137,6 +137,13 @@ class PipelineController(object): abort_on_failure=False, # type: bool add_run_number=True, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None # type: Optional[str] ): # type: (...) -> None """ @@ -178,6 +185,23 @@ class PipelineController(object): print(node.name, ' failed') # allow up to 5 retries (total of 6 runs) return retries < 5 + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + If not provided, packages are automatically added. + :param repo: Optional, specify a repository to attach to the pipeline controller, when remotely executing. + Allow users to execute the controller inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) """ self._nodes = {} self._running_nodes = [] @@ -236,6 +260,11 @@ class PipelineController(object): self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) self._task.set_user_properties(version=self._version) + self._task.set_base_docker( + docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script + ) + self._task.set_packages(packages) + self._task.set_repo(repo, branch=repo_branch, commit=repo_commit) self._auto_connect_task = bool(self._task) # make sure we add to the main Task the pipeline tag if self._task and not self._pipeline_as_sub_project: @@ -974,7 +1003,7 @@ class PipelineController(object): if mark_failed: self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True) elif mark_aborted: - self._task.mark_stopped(status_reason='Pipeline aborted', force=True) + self._task.mark_stopped(status_message='Pipeline aborted', force=True) elif self._pipeline_task_status_failed: print('Setting pipeline controller Task as failed (due to failed steps) !') self._task.mark_failed(status_reason='Pipeline step failed', force=True) @@ -2872,6 +2901,13 @@ class PipelineDecorator(PipelineController): abort_on_failure=False, # type: bool add_run_number=True, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None # type: Optional[str] ): # type: (...) -> () """ @@ -2909,7 +2945,23 @@ class PipelineDecorator(PipelineController): print(node.name, ' failed') # allow up to 5 retries (total of 6 runs) return retries < 5 - + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + If not provided, packages are automatically added. + :param repo: Optional, specify a repository to attach to the pipeline controller, when remotely executing. + Allow users to execute the controller inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) """ super(PipelineDecorator, self).__init__( name=name, @@ -2921,6 +2973,13 @@ class PipelineDecorator(PipelineController): abort_on_failure=abort_on_failure, add_run_number=add_run_number, retry_on_failure=retry_on_failure, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + packages=packages, + repo=repo, + repo_branch=repo_branch, + repo_commit=repo_commit ) # if we are in eager execution, make sure parent class knows it @@ -3595,7 +3654,14 @@ class PipelineDecorator(PipelineController): add_run_number=True, # type: bool args_map=None, # type: dict[str, List[str]] start_controller_locally=False, # type: bool - retry_on_failure=None # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + packages=None, # type: Optional[Union[str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None # type: Optional[str] ): # type: (...) -> Callable """ @@ -3662,7 +3728,23 @@ class PipelineDecorator(PipelineController): print(node.name, ' failed') # allow up to 5 retries (total of 6 runs) return retries < 5 - + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + If not provided, packages are automatically added based on the imports used in the function. + :param repo: Optional, specify a repository to attach to the function, when remotely executing. + Allow users to execute the function inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) """ def decorator_wrap(func): @@ -3700,6 +3782,13 @@ class PipelineDecorator(PipelineController): abort_on_failure=abort_on_failure, add_run_number=add_run_number, retry_on_failure=retry_on_failure, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + packages=packages, + repo=repo, + repo_branch=repo_branch, + repo_commit=repo_commit ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -3741,7 +3830,14 @@ class PipelineDecorator(PipelineController): target_project=target_project, abort_on_failure=abort_on_failure, add_run_number=add_run_number, - retry_on_failure=retry_on_failure + retry_on_failure=retry_on_failure, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + packages=packages, + repo=repo, + repo_branch=repo_branch, + repo_commit=repo_commit ) a_pipeline._args_map = args_map or {} diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 7a55d66d..0596c9d0 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -276,7 +276,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): # add ide info into task runtime_properties # noinspection PyBroadException try: - self._set_runtime_properties(runtime_properties={"IDE": result.script["ide"]}) + self._set_runtime_properties(runtime_properties={"ide": result.script["ide"]}) except Exception as ex: self.log.info("Failed logging ide information: {}".format(ex)) @@ -1396,6 +1396,52 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): execution.docker_cmd = image + (' {}'.format(arguments) if arguments else '') self._edit(execution=execution) + def set_packages(self, packages): + # type: (Union[str, Sequence[str]]) -> () + """ + Manually specify a list of required packages or a local requirements.txt file. + + :param packages: The list of packages or the path to the requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + """ + if not packages: + return + if not isinstance(packages, str) or not os.path.exists(packages): + # noinspection PyProtectedMember + self._update_requirements(packages) + return + with open(packages) as f: + # noinspection PyProtectedMember + self._update_requirements([line.strip() for line in f.readlines()]) + + def set_repo(self, repo, branch=None, commit=None): + # type: (str, Optional[str], Optional[str]) -> () + """ + Specify a repository to attach to the function. + Allow users to execute the task inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). + Example remote url: 'https://github.com/user/repo.git'. + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy. + + :param repo: Remote URL for the repository to use, OR path to local copy of the git repository + Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo' + :param branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param commit: Optional, specify the repository commit id (Ignored, if local repo path is used) + """ + if not repo: + return + with self._edit_lock: + self.reload() + self.data.script.repository = repo + if branch: + self.data.script.branch = branch + if commit: + self.data.script.version_num = commit + self._edit(script=self.data.script) + def get_base_docker(self): # type: () -> str """Get the base Docker command (image) that is set for this experiment.""" @@ -2330,7 +2376,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): return res def _update_requirements(self, requirements): - # type: (Union[dict, str]) -> () + # type: (Union[dict, str, Sequence[str]]) -> () if not isinstance(requirements, dict): requirements = {'pip': requirements} diff --git a/clearml/datasets/dataset.py b/clearml/datasets/dataset.py index d293f7bb..80302dd5 100644 --- a/clearml/datasets/dataset.py +++ b/clearml/datasets/dataset.py @@ -801,7 +801,7 @@ class Dataset(object): return True def set_metadata(self, metadata, metadata_name='metadata', ui_visible=True): - # type: (Union[numpy.array, pd.DataFrame, Dict[str, Any]], str, bool) -> () + # type: (Union[numpy.array, pd.DataFrame, Dict[str, Any]], str, bool) -> () # noqa: F821 """ Attach a user-defined metadata to the dataset. Check `Task.upload_artifact` for supported types. If type is Optionally make it visible as a table in the UI. @@ -821,7 +821,7 @@ class Dataset(object): ) def get_metadata(self, metadata_name='metadata'): - # type: (str) -> Optional[numpy.array, pd.DataFrame, dict, str, bool] + # type: (str) -> Optional[numpy.array, pd.DataFrame, dict, str, bool] # noqa: F821 """ Get attached metadata back in its original format. Will return None if none was found. """ diff --git a/clearml/task.py b/clearml/task.py index d612a6cb..36d6c305 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -2400,6 +2400,41 @@ class Task(_Task): docker_setup_bash_script=docker_setup_bash_script ) + def set_packages(self, packages): + # type: (Union[str, Sequence[str]]) -> () + """ + Manually specify a list of required packages or a local requirements.txt file. + When running remotely the call is ignored + + :param packages: The list of packages or the path to the requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" + """ + if running_remotely(): + return + super(Task, self).set_packages(packages) + + def set_repo(self, repo, branch=None, commit=None): + # type: (str, Optional[str], Optional[str]) -> () + """ + Specify a repository to attach to the function. + Allow users to execute the task inside the specified repository, enabling them to load modules/script + from the repository. Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path (automatically converted into the remote + git/commit as is currently checkout). + Example remote url: 'https://github.com/user/repo.git'. + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy. + When executing remotely, this call will not override the repository data (it is ignored) + + :param repo: Remote URL for the repository to use, OR path to local copy of the git repository + Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo' + :param branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param commit: Optional, specify the repository commit id (Ignored, if local repo path is used) + """ + if running_remotely(): + return + super(Task, self).set_repo(repo, branch=branch, commit=commit) + def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800): # type: (float) -> bool """