Add pipeline decorator argument to control docker image (#856)

This commit is contained in:
allegroai 2022-12-22 22:11:43 +02:00
parent 341aba2cee
commit a621b4fa20
4 changed files with 186 additions and 9 deletions

View File

@ -137,6 +137,13 @@ class PipelineController(object):
abort_on_failure=False, # type: bool abort_on_failure=False, # type: bool
add_run_number=True, # type: bool add_run_number=True, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None # type: Optional[str]
): ):
# type: (...) -> None # type: (...) -> None
""" """
@ -178,6 +185,23 @@ class PipelineController(object):
print(node.name, ' failed') print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs) # allow up to 5 retries (total of 6 runs)
return retries < 5 return retries < 5
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added.
:param repo: Optional, specify a repository to attach to the pipeline controller, when remotely executing.
Allow users to execute the controller inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
""" """
self._nodes = {} self._nodes = {}
self._running_nodes = [] self._running_nodes = []
@ -236,6 +260,11 @@ class PipelineController(object):
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
self._task.set_user_properties(version=self._version) self._task.set_user_properties(version=self._version)
self._task.set_base_docker(
docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script
)
self._task.set_packages(packages)
self._task.set_repo(repo, branch=repo_branch, commit=repo_commit)
self._auto_connect_task = bool(self._task) self._auto_connect_task = bool(self._task)
# make sure we add to the main Task the pipeline tag # make sure we add to the main Task the pipeline tag
if self._task and not self._pipeline_as_sub_project: if self._task and not self._pipeline_as_sub_project:
@ -974,7 +1003,7 @@ class PipelineController(object):
if mark_failed: if mark_failed:
self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True) self._task.mark_failed(status_reason='Pipeline aborted and failed', force=True)
elif mark_aborted: elif mark_aborted:
self._task.mark_stopped(status_reason='Pipeline aborted', force=True) self._task.mark_stopped(status_message='Pipeline aborted', force=True)
elif self._pipeline_task_status_failed: elif self._pipeline_task_status_failed:
print('Setting pipeline controller Task as failed (due to failed steps) !') print('Setting pipeline controller Task as failed (due to failed steps) !')
self._task.mark_failed(status_reason='Pipeline step failed', force=True) self._task.mark_failed(status_reason='Pipeline step failed', force=True)
@ -2872,6 +2901,13 @@ class PipelineDecorator(PipelineController):
abort_on_failure=False, # type: bool abort_on_failure=False, # type: bool
add_run_number=True, # type: bool add_run_number=True, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None # type: Optional[str]
): ):
# type: (...) -> () # type: (...) -> ()
""" """
@ -2909,7 +2945,23 @@ class PipelineDecorator(PipelineController):
print(node.name, ' failed') print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs) # allow up to 5 retries (total of 6 runs)
return retries < 5 return retries < 5
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added.
:param repo: Optional, specify a repository to attach to the pipeline controller, when remotely executing.
Allow users to execute the controller inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
""" """
super(PipelineDecorator, self).__init__( super(PipelineDecorator, self).__init__(
name=name, name=name,
@ -2921,6 +2973,13 @@ class PipelineDecorator(PipelineController):
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure, retry_on_failure=retry_on_failure,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
packages=packages,
repo=repo,
repo_branch=repo_branch,
repo_commit=repo_commit
) )
# if we are in eager execution, make sure parent class knows it # if we are in eager execution, make sure parent class knows it
@ -3595,7 +3654,14 @@ class PipelineDecorator(PipelineController):
add_run_number=True, # type: bool add_run_number=True, # type: bool
args_map=None, # type: dict[str, List[str]] args_map=None, # type: dict[str, List[str]]
start_controller_locally=False, # type: bool start_controller_locally=False, # type: bool
retry_on_failure=None # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None # type: Optional[str]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -3662,7 +3728,23 @@ class PipelineDecorator(PipelineController):
print(node.name, ' failed') print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs) # allow up to 5 retries (total of 6 runs)
return retries < 5 return retries < 5
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added based on the imports used in the function.
:param repo: Optional, specify a repository to attach to the function, when remotely executing.
Allow users to execute the function inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
""" """
def decorator_wrap(func): def decorator_wrap(func):
@ -3700,6 +3782,13 @@ class PipelineDecorator(PipelineController):
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure, retry_on_failure=retry_on_failure,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
packages=packages,
repo=repo,
repo_branch=repo_branch,
repo_commit=repo_commit
) )
ret_val = func(**pipeline_kwargs) ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references() LazyEvalWrapper.trigger_all_remote_references()
@ -3741,7 +3830,14 @@ class PipelineDecorator(PipelineController):
target_project=target_project, target_project=target_project,
abort_on_failure=abort_on_failure, abort_on_failure=abort_on_failure,
add_run_number=add_run_number, add_run_number=add_run_number,
retry_on_failure=retry_on_failure retry_on_failure=retry_on_failure,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
packages=packages,
repo=repo,
repo_branch=repo_branch,
repo_commit=repo_commit
) )
a_pipeline._args_map = args_map or {} a_pipeline._args_map = args_map or {}

View File

@ -276,7 +276,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
# add ide info into task runtime_properties # add ide info into task runtime_properties
# noinspection PyBroadException # noinspection PyBroadException
try: try:
self._set_runtime_properties(runtime_properties={"IDE": result.script["ide"]}) self._set_runtime_properties(runtime_properties={"ide": result.script["ide"]})
except Exception as ex: except Exception as ex:
self.log.info("Failed logging ide information: {}".format(ex)) self.log.info("Failed logging ide information: {}".format(ex))
@ -1396,6 +1396,52 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
execution.docker_cmd = image + (' {}'.format(arguments) if arguments else '') execution.docker_cmd = image + (' {}'.format(arguments) if arguments else '')
self._edit(execution=execution) self._edit(execution=execution)
def set_packages(self, packages):
# type: (Union[str, Sequence[str]]) -> ()
"""
Manually specify a list of required packages or a local requirements.txt file.
:param packages: The list of packages or the path to the requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
"""
if not packages:
return
if not isinstance(packages, str) or not os.path.exists(packages):
# noinspection PyProtectedMember
self._update_requirements(packages)
return
with open(packages) as f:
# noinspection PyProtectedMember
self._update_requirements([line.strip() for line in f.readlines()])
def set_repo(self, repo, branch=None, commit=None):
# type: (str, Optional[str], Optional[str]) -> ()
"""
Specify a repository to attach to the function.
Allow users to execute the task inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'.
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy.
:param repo: Remote URL for the repository to use, OR path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
"""
if not repo:
return
with self._edit_lock:
self.reload()
self.data.script.repository = repo
if branch:
self.data.script.branch = branch
if commit:
self.data.script.version_num = commit
self._edit(script=self.data.script)
def get_base_docker(self): def get_base_docker(self):
# type: () -> str # type: () -> str
"""Get the base Docker command (image) that is set for this experiment.""" """Get the base Docker command (image) that is set for this experiment."""
@ -2330,7 +2376,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return res return res
def _update_requirements(self, requirements): def _update_requirements(self, requirements):
# type: (Union[dict, str]) -> () # type: (Union[dict, str, Sequence[str]]) -> ()
if not isinstance(requirements, dict): if not isinstance(requirements, dict):
requirements = {'pip': requirements} requirements = {'pip': requirements}

View File

@ -801,7 +801,7 @@ class Dataset(object):
return True return True
def set_metadata(self, metadata, metadata_name='metadata', ui_visible=True): def set_metadata(self, metadata, metadata_name='metadata', ui_visible=True):
# type: (Union[numpy.array, pd.DataFrame, Dict[str, Any]], str, bool) -> () # type: (Union[numpy.array, pd.DataFrame, Dict[str, Any]], str, bool) -> () # noqa: F821
""" """
Attach a user-defined metadata to the dataset. Check `Task.upload_artifact` for supported types. Attach a user-defined metadata to the dataset. Check `Task.upload_artifact` for supported types.
If type is Optionally make it visible as a table in the UI. If type is Optionally make it visible as a table in the UI.
@ -821,7 +821,7 @@ class Dataset(object):
) )
def get_metadata(self, metadata_name='metadata'): def get_metadata(self, metadata_name='metadata'):
# type: (str) -> Optional[numpy.array, pd.DataFrame, dict, str, bool] # type: (str) -> Optional[numpy.array, pd.DataFrame, dict, str, bool] # noqa: F821
""" """
Get attached metadata back in its original format. Will return None if none was found. Get attached metadata back in its original format. Will return None if none was found.
""" """

View File

@ -2400,6 +2400,41 @@ class Task(_Task):
docker_setup_bash_script=docker_setup_bash_script docker_setup_bash_script=docker_setup_bash_script
) )
def set_packages(self, packages):
# type: (Union[str, Sequence[str]]) -> ()
"""
Manually specify a list of required packages or a local requirements.txt file.
When running remotely the call is ignored
:param packages: The list of packages or the path to the requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
"""
if running_remotely():
return
super(Task, self).set_packages(packages)
def set_repo(self, repo, branch=None, commit=None):
# type: (str, Optional[str], Optional[str]) -> ()
"""
Specify a repository to attach to the function.
Allow users to execute the task inside the specified repository, enabling them to load modules/script
from the repository. Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path (automatically converted into the remote
git/commit as is currently checkout).
Example remote url: 'https://github.com/user/repo.git'.
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy.
When executing remotely, this call will not override the repository data (it is ignored)
:param repo: Remote URL for the repository to use, OR path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
"""
if running_remotely():
return
super(Task, self).set_repo(repo, branch=branch, commit=commit)
def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800): def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800):
# type: (float) -> bool # type: (float) -> bool
""" """