Support custom working directory for pipelines (#1194)

This commit is contained in:
allegroai 2024-03-28 15:04:09 +02:00
parent aea7e3ec6d
commit 7ab2197ec0
2 changed files with 122 additions and 38 deletions

View File

@ -179,7 +179,8 @@ class PipelineController(object):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> None
"""
@ -272,6 +273,7 @@ class PipelineController(object):
:param skip_global_imports: If True, global imports will not be included in the steps' execution when creating
the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at
the beginning of each steps execution. Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
if auto_version_bump is not None:
warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning)
@ -354,7 +356,7 @@ class PipelineController(object):
docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script
)
self._task.set_packages(packages)
self._task.set_repo(repo, branch=repo_branch, commit=repo_commit)
self._task.set_script(repository=repo, branch=repo_branch, commit=repo_commit, working_dir=working_dir)
self._auto_connect_task = bool(self._task)
# make sure we add to the main Task the pipeline tag
if self._task and not self._pipeline_as_sub_project:
@ -672,7 +674,8 @@ class PipelineController(object):
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False # type: Optional[bool]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
):
# type: (...) -> bool
"""
@ -842,6 +845,7 @@ class PipelineController(object):
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the script from.
:return: True if successful
"""
@ -884,6 +888,7 @@ class PipelineController(object):
tags=tags,
output_uri=output_uri,
draft=draft,
working_dir=working_dir
)
def start(
@ -1509,10 +1514,26 @@ class PipelineController(object):
self._task.add_tags(tags)
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, output_uri=None
self,
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
branch,
commit,
helper_functions,
output_uri=None,
working_dir=None
):
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
@ -1537,7 +1558,8 @@ class PipelineController(object):
task_template_header=self._task_template_header,
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports
skip_global_imports=self._skip_global_imports,
working_dir=working_dir
)
return task_definition
@ -1986,7 +2008,8 @@ class PipelineController(object):
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False # type: Optional[bool]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
):
# type: (...) -> bool
"""
@ -2156,6 +2179,7 @@ class PipelineController(object):
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:return: True if successful
"""
@ -2206,21 +2230,51 @@ class PipelineController(object):
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
repo_branch,
repo_commit,
helper_functions,
output_uri=output_uri,
working_dir=working_dir,
)
elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
repo_branch,
repo_commit,
helper_functions,
output_uri=output_uri,
working_dir=working_dir,
)
# update configuration with the task definitions
# noinspection PyProtectedMember
self._task._set_configuration(
@ -3338,7 +3392,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> ()
"""
@ -3424,6 +3479,7 @@ class PipelineDecorator(PipelineController):
:param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
super(PipelineDecorator, self).__init__(
name=name,
@ -3446,7 +3502,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
# if we are in eager execution, make sure parent class knows it
@ -3664,29 +3721,44 @@ class PipelineDecorator(PipelineController):
self._force_task_configuration_update()
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit,
helper_functions, output_uri=None
self,
docker,
docker_args,
docker_bash_setup_script,
function,
function_input_artifacts,
function_kwargs,
function_return,
auto_connect_frameworks,
auto_connect_arg_parser,
packages,
project_name,
task_name,
task_type,
repo,
branch,
commit,
helper_functions,
output_uri=None,
working_dir=None
):
def sanitize(function_source):
matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source)
if matched:
function_source = function_source[matched.span()[1]:]
function_source = function_source[matched.span()[1] :]
# find the last ")"
open_parenthesis = 0
last_index = -1
for i, c in enumerate(function_source):
if not open_parenthesis and c == ')':
if not open_parenthesis and c == ")":
last_index = i
break
elif c == ')':
elif c == ")":
open_parenthesis -= 1
elif c == '(':
elif c == "(":
open_parenthesis += 1
if last_index >= 0:
function_source = function_source[last_index+1:].lstrip()
function_source = function_source[last_index + 1 :].lstrip()
return function_source
task_definition = CreateFromFunction.create_task_from_function(
@ -3713,7 +3785,8 @@ class PipelineDecorator(PipelineController):
_sanitize_function=sanitize,
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports
skip_global_imports=self._skip_global_imports,
working_dir=working_dir,
)
return task_definition
@ -3791,7 +3864,8 @@ class PipelineDecorator(PipelineController):
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None, # type: Optional[Union[str, bool]]
draft=False # type: Optional[bool]
draft=False, # type: Optional[bool]
working_dir=None # type: Optional[str]
):
# type: (...) -> Callable
"""
@ -3932,6 +4006,7 @@ class PipelineDecorator(PipelineController):
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param draft: (default False). If True, the Task will be created as a draft task.
:param working_dir: Working directory to launch the step from.
:return: function wrapper
"""
@ -3979,6 +4054,7 @@ class PipelineDecorator(PipelineController):
tags=tags,
output_uri=output_uri,
draft=draft,
working_dir=working_dir
)
if cls._singleton:
@ -4213,7 +4289,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> Callable
"""
@ -4330,6 +4407,7 @@ class PipelineDecorator(PipelineController):
:param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
"""
def decorator_wrap(func):
@ -4377,7 +4455,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@ -4430,7 +4509,8 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports
skip_global_imports=skip_global_imports,
working_dir=working_dir
)
a_pipeline._args_map = args_map or {}

View File

@ -574,7 +574,8 @@ if __name__ == '__main__':
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
_sanitize_function=None, # type: Optional[Callable[[str], str]]
_sanitize_helper_functions=None, # type: Optional[Callable[[str], str]]
skip_global_imports=False # type: bool
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
):
# type: (...) -> Optional[Dict, Task]
"""
@ -660,6 +661,8 @@ if __name__ == '__main__':
:param skip_global_imports: If True, the global imports will not be fetched from the function's file, otherwise
all global imports will be automatically imported in a safe manner at the beginning of the function's
execution. Default is False
:param working_dir: Optional, Working directory to launch the script from.
:return: Newly created Task object
"""
# not set -> equals True
@ -774,6 +777,7 @@ if __name__ == '__main__':
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
working_directory=working_dir
)
entry_point = '{}.py'.format(function_name)
task = populate.create_task(dry_run=dry_run)
@ -781,7 +785,7 @@ if __name__ == '__main__':
if dry_run:
task['script']['diff'] = task_template
task['script']['entry_point'] = entry_point
task['script']['working_dir'] = '.'
task['script']['working_dir'] = working_dir or '.'
task['hyperparams'] = {
cls.kwargs_section: {
k: dict(section=cls.kwargs_section, name=k,