diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index 83acbddf..9cbac4a2 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -1,39 +1,40 @@ +import inspect import json import os import re +import tempfile from functools import reduce from logging import getLogger -from typing import Optional, Sequence, Union, Tuple, List - -from six.moves.urllib.parse import urlparse +from typing import Optional, Sequence, Union, Tuple, List, Callable, Dict, Any from pathlib2 import Path +from six.moves.urllib.parse import urlparse -from ...task import Task from .repo import ScriptInfo +from ...task import Task class CreateAndPopulate(object): def __init__( self, - project_name=None, # Optional[str] - task_name=None, # Optional[str] - task_type=None, # Optional[str] - repo=None, # Optional[str] - branch=None, # Optional[str] - commit=None, # Optional[str] - script=None, # Optional[str] - working_directory=None, # Optional[str] - packages=None, # Optional[Union[bool, Sequence[str]]] - requirements_file=None, # Optional[Union[str, Path]] - docker=None, # Optional[str] - docker_args=None, # Optional[str] - docker_bash_setup_script=None, # Optional[str] - output_uri=None, # Optional[str] - base_task_id=None, # Optional[str] - add_task_init_call=True, # bool - raise_on_missing_entries=False, # bool - verbose=False, # bool + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + repo=None, # type: Optional[str] + branch=None, # type: Optional[str] + commit=None, # type: Optional[str] + script=None, # type: Optional[str] + working_directory=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, Sequence[str]]] + requirements_file=None, # type: Optional[Union[str, Path]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + output_uri=None, # type: Optional[str] + base_task_id=None, # type: Optional[str] + add_task_init_call=True, # type: bool + raise_on_missing_entries=False, # type: bool + verbose=False, # type: bool ): # type: (...) -> None """ @@ -139,8 +140,11 @@ class CreateAndPopulate(object): repo_info, requirements = ScriptInfo.get( filepaths=[entry_point], log=getLogger(), - create_requirements=self.packages is True, uncommitted_from_remote=True, - detect_jupyter_notebook=False) + create_requirements=self.packages is True, + uncommitted_from_remote=True, + detect_jupyter_notebook=False, + add_missing_installed_packages=True, + ) # check if we have no repository and no requirements raise error if self.raise_on_missing_entries and (not self.requirements_file and not self.packages) \ @@ -409,3 +413,139 @@ class CreateAndPopulate(object): break return found_index if found_index < 0 else lines[found_index][0] + + +def create_task_from_function( + a_function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_input_artifacts=None, # type: Optional[Dict[str, str]] + function_results=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + packages=None, # type: Optional[Sequence[str]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + output_uri=None, # type: Optional[str] +): + # type: (...) -> Optional[Task] + """ + Create a Task from a function, including wrapping the function input arguments + into the hyper-parameter section as kwargs, and storing function results as named artifacts + + Example: + def mock_func(a=6, b=9): + c = a*b + print(a, b, c) + return c, c**2 + + create_task_from_function(mock_func, function_results=['mul', 'square']) + + Example arguments from other Tasks (artifact): + def mock_func(matrix_np): + c = matrix_np*matrix_np + print(matrix_np, c) + return c + + create_task_from_function( + mock_func, + function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'}, + function_results=['square_matrix'] + ) + + :param a_function: A global function to convert into a standalone Task + :param function_kwargs: Optional, provide subset of function arguments and default values to expose. + If not provided automatically take all function arguments & defaults + :param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact. + Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: + {'numpy_matrix': 'aabbcc.answer'} + :param function_results: Provide a list of names for all the results. + If not provided no results will be stored as artifacts. + :param project_name: Set the project name for the task. Required if base_task_id is None. + :param task_name: Set the name of the remote task. Required if base_task_id is None. + :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', + 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + If not provided, packages are automatically added based on the imports used in the function. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param output_uri: Optional, set the Tasks's output_uri (Storage destination). + examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/' + :return: Newly created Task object + """ + function_name = str(a_function.__name__) + function_source = inspect.getsource(a_function) + function_input_artifacts = function_input_artifacts or dict() + # verify artifact kwargs: + if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()): + raise ValueError( + 'function_input_artifacts={}, it must in the format: ' + '{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts) + ) + + if function_kwargs is None: + function_kwargs = dict() + inspect_args = inspect.getfullargspec(a_function) + if inspect_args and inspect_args.args: + inspect_defaults = inspect_args.defaults + if inspect_defaults and len(inspect_defaults) != len(inspect_args.args): + getLogger().warning( + 'Ignoring default argument values: ' + 'could not find all default valued for: \'{}\''.format(function_name)) + inspect_defaults = [] + + function_kwargs = {str(k): v for k, v in zip(inspect_args.args, inspect_defaults)} \ + if inspect_defaults else {str(k): None for k in inspect_args.args} + + task_template = """ +from clearml import Task + +{function_source} + +if __name__ == '__main__': + task = Task.init() + kwargs = {function_kwargs} + task.connect(kwargs, name='kwargs') + function_input_artifacts = {function_input_artifacts} + if function_input_artifacts: + task.connect(function_input_artifacts, name='kwargs_artifacts') + for k, v in function_input_artifacts.items(): + if not v: + continue + task_id, artifact_name = v.split('.', 1) + kwargs[k] = Task.get_task(task_id=task_id).artifact[artifact_name].get() + results = {function_name}(**kwargs) + result_names = {function_results} + if results and result_names: + for name, artifact in zip(results, result_names): + task.upload_artifact(name=name, artifact_object=artifact) + + """.format( + function_source=function_source, + function_kwargs=function_kwargs, + function_input_artifacts=function_input_artifacts, + function_name=function_name, + function_results=function_results) + + with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: + temp_file.write(task_template) + temp_file.flush() + + populate = CreateAndPopulate( + project_name=project_name, + task_name=task_name, + task_type=task_type, + script=temp_file.name, + packages=packages if packages is not None else True, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + output_uri=output_uri, + add_task_init_call=False, + ) + task = populate.create_task() + task.update_task(task_data={'script': {'entry_point': '{}.py'.format(function_name)}}) + return task diff --git a/clearml/backend_interface/task/repo/scriptinfo.py b/clearml/backend_interface/task/repo/scriptinfo.py index 4b906770..7ff3b620 100644 --- a/clearml/backend_interface/task/repo/scriptinfo.py +++ b/clearml/backend_interface/task/repo/scriptinfo.py @@ -32,7 +32,7 @@ class ScriptRequirements(object): def __init__(self, root_folder): self._root_folder = root_folder - def get_requirements(self, entry_point_filename=None): + def get_requirements(self, entry_point_filename=None, add_missing_installed_packages=False): # noinspection PyBroadException try: from ....utilities.pigar.reqs import get_installed_pkgs_detail @@ -44,6 +44,10 @@ class ScriptRequirements(object): 'site-packages', 'dist-packages']) reqs, try_imports, guess, local_pks = gr.extract_reqs( module_callback=ScriptRequirements.add_trains_used_packages, entry_point_filename=entry_point_filename) + if add_missing_installed_packages and guess: + for k in guess: + if k not in reqs: + reqs[k] = guess[k] return self.create_requirements_txt(reqs, local_pks) except Exception as ex: _logger.warning("Failed auto-generating package requirements: {}".format(ex)) @@ -177,13 +181,13 @@ class ScriptRequirements(object): for k, v in reqs.sorted_items(): if k in ignored_packages or k.lower() in ignored_packages: continue - version = v.version + version = v.version if v else None if k in forced_packages: forced_version = forced_packages.pop(k, None) if forced_version is not None: version = forced_version # requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()]) - requirements_txt += ScriptRequirements._make_req_line(k, version) + requirements_txt += ScriptRequirements._make_req_line(k, version or None) # add forced requirements that we could not find installed on the system for k in sorted(forced_packages.keys()): @@ -204,6 +208,8 @@ class ScriptRequirements(object): requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()]) for k, v in reqs.sorted_items(): + if not v: + continue requirements_txt += '\n' if k == '-e': requirements_txt += '# IMPORT PACKAGE {0} {1}\n'.format(k, v.version) @@ -704,7 +710,7 @@ class ScriptInfo(object): @classmethod def _get_script_info( cls, filepaths, check_uncommitted=True, create_requirements=True, log=None, - uncommitted_from_remote=False, detect_jupyter_notebook=True): + uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False): jupyter_filepath = cls._get_jupyter_notebook_filename() if detect_jupyter_notebook else None if jupyter_filepath: scripts_path = [Path(os.path.normpath(jupyter_filepath)).absolute()] @@ -786,7 +792,10 @@ class ScriptInfo(object): script_requirements = ScriptRequirements( Path(repo_root).as_posix() if repo_info.url else script_path.as_posix()) if create_requirements: - requirements, conda_requirements = script_requirements.get_requirements() + requirements, conda_requirements = script_requirements.get_requirements( + entry_point_filename=script_path.as_posix() + if not repo_info.url and script_path.is_file() else None, + add_missing_installed_packages=add_missing_installed_packages) else: script_requirements = None @@ -818,7 +827,7 @@ class ScriptInfo(object): @classmethod def get(cls, filepaths=None, check_uncommitted=True, create_requirements=True, log=None, - uncommitted_from_remote=False, detect_jupyter_notebook=True): + uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False): try: if not filepaths: filepaths = [sys.argv[0], ] @@ -828,6 +837,7 @@ class ScriptInfo(object): create_requirements=create_requirements, log=log, uncommitted_from_remote=uncommitted_from_remote, detect_jupyter_notebook=detect_jupyter_notebook, + add_missing_installed_packages=add_missing_installed_packages, ) except SystemExit: pass