diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 5ba63d82..8a40111c 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -179,7 +179,8 @@ class PipelineController(object): artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False # type: bool + skip_global_imports=False, # type: bool + working_dir=None # type: Optional[str] ): # type: (...) -> None """ @@ -272,6 +273,7 @@ class PipelineController(object): :param skip_global_imports: If True, global imports will not be included in the steps' execution when creating the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False + :param working_dir: Working directory to launch the pipeline from. """ if auto_version_bump is not None: warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning) @@ -354,7 +356,7 @@ class PipelineController(object): docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script ) self._task.set_packages(packages) - self._task.set_repo(repo, branch=repo_branch, commit=repo_commit) + self._task.set_script(repository=repo, branch=repo_branch, commit=repo_commit, working_dir=working_dir) self._auto_connect_task = bool(self._task) # make sure we add to the main Task the pipeline tag if self._task and not self._pipeline_as_sub_project: @@ -672,7 +674,8 @@ class PipelineController(object): status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa tags=None, # type: Optional[Union[str, Sequence[str]]] output_uri=None, # type: Optional[Union[str, bool]] - draft=False # type: Optional[bool] + draft=False, # type: Optional[bool] + working_dir=None # type: Optional[str] ): # type: (...) -> bool """ @@ -842,6 +845,7 @@ class PipelineController(object): :param output_uri: The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param draft: (default False). If True, the Task will be created as a draft task. + :param working_dir: Working directory to launch the script from. :return: True if successful """ @@ -884,6 +888,7 @@ class PipelineController(object): tags=tags, output_uri=output_uri, draft=draft, + working_dir=working_dir ) def start( @@ -1509,10 +1514,26 @@ class PipelineController(object): self._task.add_tags(tags) def _create_task_from_function( - self, docker, docker_args, docker_bash_setup_script, - function, function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, output_uri=None + self, + docker, + docker_args, + docker_bash_setup_script, + function, + function_input_artifacts, + function_kwargs, + function_return, + auto_connect_frameworks, + auto_connect_arg_parser, + packages, + project_name, + task_name, + task_type, + repo, + branch, + commit, + helper_functions, + output_uri=None, + working_dir=None ): task_definition = CreateFromFunction.create_task_from_function( a_function=function, @@ -1537,7 +1558,8 @@ class PipelineController(object): task_template_header=self._task_template_header, artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, - skip_global_imports=self._skip_global_imports + skip_global_imports=self._skip_global_imports, + working_dir=working_dir ) return task_definition @@ -1986,7 +2008,8 @@ class PipelineController(object): status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa tags=None, # type: Optional[Union[str, Sequence[str]]] output_uri=None, # type: Optional[Union[str, bool]] - draft=False # type: Optional[bool] + draft=False, # type: Optional[bool] + working_dir=None # type: Optional[str] ): # type: (...) -> bool """ @@ -2156,6 +2179,7 @@ class PipelineController(object): :param output_uri: The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param draft: (default False). If True, the Task will be created as a draft task. + :param working_dir: Working directory to launch the step from. :return: True if successful """ @@ -2206,21 +2230,51 @@ class PipelineController(object): project_name = project_name or self._get_target_project() or self._task.get_project_name() task_definition = self._create_task_from_function( - docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri) + docker, + docker_args, + docker_bash_setup_script, + function, + function_input_artifacts, + function_kwargs, + function_return, + auto_connect_frameworks, + auto_connect_arg_parser, + packages, + project_name, + task_name, + task_type, + repo, + repo_branch, + repo_commit, + helper_functions, + output_uri=output_uri, + working_dir=working_dir, + ) elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None: project_name = project_name or self._get_target_project() or self._task.get_project_name() task_definition = self._create_task_from_function( - docker, docker_args, docker_bash_setup_script, function, - function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri) + docker, + docker_args, + docker_bash_setup_script, + function, + function_input_artifacts, + function_kwargs, + function_return, + auto_connect_frameworks, + auto_connect_arg_parser, + packages, + project_name, + task_name, + task_type, + repo, + repo_branch, + repo_commit, + helper_functions, + output_uri=output_uri, + working_dir=working_dir, + ) # update configuration with the task definitions # noinspection PyProtectedMember self._task._set_configuration( @@ -3338,7 +3392,8 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False # type: bool + skip_global_imports=False, # type: bool + working_dir=None # type: Optional[str] ): # type: (...) -> () """ @@ -3424,6 +3479,7 @@ class PipelineDecorator(PipelineController): :param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False + :param working_dir: Working directory to launch the pipeline from. """ super(PipelineDecorator, self).__init__( name=name, @@ -3446,7 +3502,8 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=artifact_serialization_function, artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, - skip_global_imports=skip_global_imports + skip_global_imports=skip_global_imports, + working_dir=working_dir ) # if we are in eager execution, make sure parent class knows it @@ -3664,29 +3721,44 @@ class PipelineDecorator(PipelineController): self._force_task_configuration_update() def _create_task_from_function( - self, docker, docker_args, docker_bash_setup_script, - function, function_input_artifacts, function_kwargs, function_return, - auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, task_type, repo, branch, commit, - helper_functions, output_uri=None + self, + docker, + docker_args, + docker_bash_setup_script, + function, + function_input_artifacts, + function_kwargs, + function_return, + auto_connect_frameworks, + auto_connect_arg_parser, + packages, + project_name, + task_name, + task_type, + repo, + branch, + commit, + helper_functions, + output_uri=None, + working_dir=None ): def sanitize(function_source): matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source) if matched: - function_source = function_source[matched.span()[1]:] + function_source = function_source[matched.span()[1] :] # find the last ")" open_parenthesis = 0 last_index = -1 for i, c in enumerate(function_source): - if not open_parenthesis and c == ')': + if not open_parenthesis and c == ")": last_index = i break - elif c == ')': + elif c == ")": open_parenthesis -= 1 - elif c == '(': + elif c == "(": open_parenthesis += 1 if last_index >= 0: - function_source = function_source[last_index+1:].lstrip() + function_source = function_source[last_index + 1 :].lstrip() return function_source task_definition = CreateFromFunction.create_task_from_function( @@ -3713,7 +3785,8 @@ class PipelineDecorator(PipelineController): _sanitize_function=sanitize, artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, - skip_global_imports=self._skip_global_imports + skip_global_imports=self._skip_global_imports, + working_dir=working_dir, ) return task_definition @@ -3791,7 +3864,8 @@ class PipelineDecorator(PipelineController): status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa tags=None, # type: Optional[Union[str, Sequence[str]]] output_uri=None, # type: Optional[Union[str, bool]] - draft=False # type: Optional[bool] + draft=False, # type: Optional[bool] + working_dir=None # type: Optional[str] ): # type: (...) -> Callable """ @@ -3932,6 +4006,7 @@ class PipelineDecorator(PipelineController): :param output_uri: The storage / output url for this step. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param draft: (default False). If True, the Task will be created as a draft task. + :param working_dir: Working directory to launch the step from. :return: function wrapper """ @@ -3979,6 +4054,7 @@ class PipelineDecorator(PipelineController): tags=tags, output_uri=output_uri, draft=draft, + working_dir=working_dir ) if cls._singleton: @@ -4213,7 +4289,8 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] - skip_global_imports=False # type: bool + skip_global_imports=False, # type: bool + working_dir=None # type: Optional[str] ): # type: (...) -> Callable """ @@ -4330,6 +4407,7 @@ class PipelineDecorator(PipelineController): :param skip_global_imports: If True, global imports will not be included in the steps' execution, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False + :param working_dir: Working directory to launch the pipeline from. """ def decorator_wrap(func): @@ -4377,7 +4455,8 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=artifact_serialization_function, artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, - skip_global_imports=skip_global_imports + skip_global_imports=skip_global_imports, + working_dir=working_dir ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -4430,7 +4509,8 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=artifact_serialization_function, artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, - skip_global_imports=skip_global_imports + skip_global_imports=skip_global_imports, + working_dir=working_dir ) a_pipeline._args_map = args_map or {} diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index fae03a28..1a0db95c 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -574,7 +574,8 @@ if __name__ == '__main__': artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] _sanitize_function=None, # type: Optional[Callable[[str], str]] _sanitize_helper_functions=None, # type: Optional[Callable[[str], str]] - skip_global_imports=False # type: bool + skip_global_imports=False, # type: bool + working_dir=None # type: Optional[str] ): # type: (...) -> Optional[Dict, Task] """ @@ -660,6 +661,8 @@ if __name__ == '__main__': :param skip_global_imports: If True, the global imports will not be fetched from the function's file, otherwise all global imports will be automatically imported in a safe manner at the beginning of the function's execution. Default is False + :param working_dir: Optional, Working directory to launch the script from. + :return: Newly created Task object """ # not set -> equals True @@ -774,6 +777,7 @@ if __name__ == '__main__': docker_bash_setup_script=docker_bash_setup_script, output_uri=output_uri, add_task_init_call=False, + working_directory=working_dir ) entry_point = '{}.py'.format(function_name) task = populate.create_task(dry_run=dry_run) @@ -781,7 +785,7 @@ if __name__ == '__main__': if dry_run: task['script']['diff'] = task_template task['script']['entry_point'] = entry_point - task['script']['working_dir'] = '.' + task['script']['working_dir'] = working_dir or '.' task['hyperparams'] = { cls.kwargs_section: { k: dict(section=cls.kwargs_section, name=k,