diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 630f2830..3ee39ab1 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -222,7 +222,8 @@ class PipelineController(object): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> None """ @@ -317,6 +318,11 @@ class PipelineController(object): the steps from a functions, otherwise all global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ if auto_version_bump is not None: warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning) @@ -353,6 +359,7 @@ class PipelineController(object): self._artifact_serialization_function = artifact_serialization_function self._artifact_deserialization_function = artifact_deserialization_function self._skip_global_imports = skip_global_imports + self._enable_local_imports = enable_local_imports if not self._task: pipeline_project_args = self._create_pipeline_project_args(name, project) @@ -405,7 +412,7 @@ class PipelineController(object): # add direct link to the pipeline page if self._pipeline_as_sub_project and self._task: if add_run_number and self._task.running_locally(): - self._add_pipeline_name_run_number() + self._add_pipeline_name_run_number(self._task) # noinspection PyProtectedMember self._task.get_logger().report_text('ClearML pipeline page: {}'.format( '{}/pipelines/{}/experiments/{}'.format( @@ -2672,6 +2679,7 @@ class PipelineController(object): task_overrides=task_overrides, allow_caching=node.cache_executed_step, output_uri=node.output_uri, + enable_local_imports=self._enable_local_imports, **extra_args ) except Exception: @@ -3663,7 +3671,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> () """ @@ -3751,6 +3760,11 @@ class PipelineDecorator(PipelineController): global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ super(PipelineDecorator, self).__init__( name=name, @@ -3774,9 +3788,9 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) - # if we are in eager execution, make sure parent class knows it if self._eager_execution_instance: self._mock_execution = True @@ -4064,7 +4078,7 @@ class PipelineDecorator(PipelineController): artifact_serialization_function=self._artifact_serialization_function, artifact_deserialization_function=self._artifact_deserialization_function, skip_global_imports=self._skip_global_imports, - working_dir=working_dir, + working_dir=working_dir ) return task_definition @@ -4595,7 +4609,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] output_uri=None, # type: Optional[Union[str, bool]] skip_global_imports=False, # type: bool - working_dir=None # type: Optional[str] + working_dir=None, # type: Optional[str] + enable_local_imports=True # type: bool ): # type: (...) -> Callable """ @@ -4714,6 +4729,11 @@ class PipelineDecorator(PipelineController): global imports will be automatically imported in a safe manner at the beginning of each step’s execution. Default is False :param working_dir: Working directory to launch the pipeline from. + :param enable_local_imports: If True, allow pipeline steps to import from local files + by appending to the PYTHONPATH of each step the directory the pipeline controller + script resides in (sys.path[0]). + If False, the directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ def decorator_wrap(func): @@ -4762,7 +4782,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -4816,7 +4837,8 @@ class PipelineDecorator(PipelineController): artifact_deserialization_function=artifact_deserialization_function, output_uri=output_uri, skip_global_imports=skip_global_imports, - working_dir=working_dir + working_dir=working_dir, + enable_local_imports=enable_local_imports ) a_pipeline._args_map = args_map or {} diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 235f23c1..0498c25d 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -522,6 +522,7 @@ class ClearmlJob(BaseJob): allow_caching=False, # type: bool target_project=None, # type: Optional[str] output_uri=None, # type: Optional[Union[str, bool]] + enable_local_imports=True, # type: bool **kwargs # type: Any ): # type: (...) -> () @@ -548,9 +549,14 @@ class ClearmlJob(BaseJob): :param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param str target_project: Optional, Set the target project name to create the cloned Task in. + :param enable_local_imports: If True, allow jobs to import from local files + by appending PYTHONPATH sys.path[0]. + If False, the current path directory won't be appended to PYTHONPATH. Default is True. + Ignored while running remotely. """ super(ClearmlJob, self).__init__() base_temp_task = Task.get_task(task_id=base_task_id) + self._enable_local_imports = enable_local_imports if disable_clone_task: self.task = base_temp_task task_status = self.task.status @@ -717,6 +723,14 @@ class LocalClearmlJob(ClearmlJob): env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id) env['CLEARML_LOG_TASK_TO_BACKEND'] = '1' env['CLEARML_SIMULATE_REMOTE_TASK'] = '1' + try: + if self._enable_local_imports: + current_python_path = env.get("PYTHONPATH") + env["PYTHONPATH"] = ( + "{}:{}".format(current_python_path, sys.path[0]) if current_python_path else sys.path[0] + ) + except Exception as e: + logger.warning("Could not append local path to PYTHONPATH: {}".format(e)) self.task.mark_started() self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env) return True