Allow for local imports in pipeline steps

This commit is contained in:
clearml 2024-12-07 17:19:31 +02:00
parent a494a926f2
commit 65c18798f4
2 changed files with 45 additions and 9 deletions
clearml/automation

View File

@ -222,7 +222,8 @@ class PipelineController(object):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> None
"""
@ -317,6 +318,11 @@ class PipelineController(object):
the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at
the beginning of each steps execution. Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
if auto_version_bump is not None:
warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning)
@ -353,6 +359,7 @@ class PipelineController(object):
self._artifact_serialization_function = artifact_serialization_function
self._artifact_deserialization_function = artifact_deserialization_function
self._skip_global_imports = skip_global_imports
self._enable_local_imports = enable_local_imports
if not self._task:
pipeline_project_args = self._create_pipeline_project_args(name, project)
@ -405,7 +412,7 @@ class PipelineController(object):
# add direct link to the pipeline page
if self._pipeline_as_sub_project and self._task:
if add_run_number and self._task.running_locally():
self._add_pipeline_name_run_number()
self._add_pipeline_name_run_number(self._task)
# noinspection PyProtectedMember
self._task.get_logger().report_text('ClearML pipeline page: {}'.format(
'{}/pipelines/{}/experiments/{}'.format(
@ -2672,6 +2679,7 @@ class PipelineController(object):
task_overrides=task_overrides,
allow_caching=node.cache_executed_step,
output_uri=node.output_uri,
enable_local_imports=self._enable_local_imports,
**extra_args
)
except Exception:
@ -3663,7 +3671,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> ()
"""
@ -3751,6 +3760,11 @@ class PipelineDecorator(PipelineController):
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
super(PipelineDecorator, self).__init__(
name=name,
@ -3774,9 +3788,9 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
# if we are in eager execution, make sure parent class knows it
if self._eager_execution_instance:
self._mock_execution = True
@ -4064,7 +4078,7 @@ class PipelineDecorator(PipelineController):
artifact_serialization_function=self._artifact_serialization_function,
artifact_deserialization_function=self._artifact_deserialization_function,
skip_global_imports=self._skip_global_imports,
working_dir=working_dir,
working_dir=working_dir
)
return task_definition
@ -4595,7 +4609,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None, # type: Optional[Union[str, bool]]
skip_global_imports=False, # type: bool
working_dir=None # type: Optional[str]
working_dir=None, # type: Optional[str]
enable_local_imports=True # type: bool
):
# type: (...) -> Callable
"""
@ -4714,6 +4729,11 @@ class PipelineDecorator(PipelineController):
global imports will be automatically imported in a safe manner at the beginning of each steps execution.
Default is False
:param working_dir: Working directory to launch the pipeline from.
:param enable_local_imports: If True, allow pipeline steps to import from local files
by appending to the PYTHONPATH of each step the directory the pipeline controller
script resides in (sys.path[0]).
If False, the directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
def decorator_wrap(func):
@ -4762,7 +4782,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
@ -4816,7 +4837,8 @@ class PipelineDecorator(PipelineController):
artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri,
skip_global_imports=skip_global_imports,
working_dir=working_dir
working_dir=working_dir,
enable_local_imports=enable_local_imports
)
a_pipeline._args_map = args_map or {}

View File

@ -522,6 +522,7 @@ class ClearmlJob(BaseJob):
allow_caching=False, # type: bool
target_project=None, # type: Optional[str]
output_uri=None, # type: Optional[Union[str, bool]]
enable_local_imports=True, # type: bool
**kwargs # type: Any
):
# type: (...) -> ()
@ -548,9 +549,14 @@ class ClearmlJob(BaseJob):
:param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for
output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param str target_project: Optional, Set the target project name to create the cloned Task in.
:param enable_local_imports: If True, allow jobs to import from local files
by appending PYTHONPATH sys.path[0].
If False, the current path directory won't be appended to PYTHONPATH. Default is True.
Ignored while running remotely.
"""
super(ClearmlJob, self).__init__()
base_temp_task = Task.get_task(task_id=base_task_id)
self._enable_local_imports = enable_local_imports
if disable_clone_task:
self.task = base_temp_task
task_status = self.task.status
@ -717,6 +723,14 @@ class LocalClearmlJob(ClearmlJob):
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
try:
if self._enable_local_imports:
current_python_path = env.get("PYTHONPATH")
env["PYTHONPATH"] = (
"{}:{}".format(current_python_path, sys.path[0]) if current_python_path else sys.path[0]
)
except Exception as e:
logger.warning("Could not append local path to PYTHONPATH: {}".format(e))
self.task.mark_started()
self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env)
return True