Change CreateAndPopulate will auto list packages imported but not installed locally.

Add Task populate create_task_from_function
This commit is contained in:
allegroai 2021-08-15 22:29:50 +03:00
parent 69a85924b0
commit aacc9826a8
2 changed files with 180 additions and 30 deletions

View File

@ -1,39 +1,40 @@
import inspect
import json
import os
import re
import tempfile
from functools import reduce
from logging import getLogger
from typing import Optional, Sequence, Union, Tuple, List
from six.moves.urllib.parse import urlparse
from typing import Optional, Sequence, Union, Tuple, List, Callable, Dict, Any
from pathlib2 import Path
from six.moves.urllib.parse import urlparse
from ...task import Task
from .repo import ScriptInfo
from ...task import Task
class CreateAndPopulate(object):
def __init__(
self,
project_name=None, # Optional[str]
task_name=None, # Optional[str]
task_type=None, # Optional[str]
repo=None, # Optional[str]
branch=None, # Optional[str]
commit=None, # Optional[str]
script=None, # Optional[str]
working_directory=None, # Optional[str]
packages=None, # Optional[Union[bool, Sequence[str]]]
requirements_file=None, # Optional[Union[str, Path]]
docker=None, # Optional[str]
docker_args=None, # Optional[str]
docker_bash_setup_script=None, # Optional[str]
output_uri=None, # Optional[str]
base_task_id=None, # Optional[str]
add_task_init_call=True, # bool
raise_on_missing_entries=False, # bool
verbose=False, # bool
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool
raise_on_missing_entries=False, # type: bool
verbose=False, # type: bool
):
# type: (...) -> None
"""
@ -139,8 +140,11 @@ class CreateAndPopulate(object):
repo_info, requirements = ScriptInfo.get(
filepaths=[entry_point],
log=getLogger(),
create_requirements=self.packages is True, uncommitted_from_remote=True,
detect_jupyter_notebook=False)
create_requirements=self.packages is True,
uncommitted_from_remote=True,
detect_jupyter_notebook=False,
add_missing_installed_packages=True,
)
# check if we have no repository and no requirements raise error
if self.raise_on_missing_entries and (not self.requirements_file and not self.packages) \
@ -409,3 +413,139 @@ class CreateAndPopulate(object):
break
return found_index if found_index < 0 else lines[found_index][0]
def create_task_from_function(
a_function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_input_artifacts=None, # type: Optional[Dict[str, str]]
function_results=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
):
# type: (...) -> Optional[Task]
"""
Create a Task from a function, including wrapping the function input arguments
into the hyper-parameter section as kwargs, and storing function results as named artifacts
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_results=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'},
function_results=['square_matrix']
)
:param a_function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults
:param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_results: Provide a list of names for all the results.
If not provided no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
If not provided, packages are automatically added based on the imports used in the function.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param output_uri: Optional, set the Tasks's output_uri (Storage destination).
examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/'
:return: Newly created Task object
"""
function_name = str(a_function.__name__)
function_source = inspect.getsource(a_function)
function_input_artifacts = function_input_artifacts or dict()
# verify artifact kwargs:
if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()):
raise ValueError(
'function_input_artifacts={}, it must in the format: '
'{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts)
)
if function_kwargs is None:
function_kwargs = dict()
inspect_args = inspect.getfullargspec(a_function)
if inspect_args and inspect_args.args:
inspect_defaults = inspect_args.defaults
if inspect_defaults and len(inspect_defaults) != len(inspect_args.args):
getLogger().warning(
'Ignoring default argument values: '
'could not find all default valued for: \'{}\''.format(function_name))
inspect_defaults = []
function_kwargs = {str(k): v for k, v in zip(inspect_args.args, inspect_defaults)} \
if inspect_defaults else {str(k): None for k in inspect_args.args}
task_template = """
from clearml import Task
{function_source}
if __name__ == '__main__':
task = Task.init()
kwargs = {function_kwargs}
task.connect(kwargs, name='kwargs')
function_input_artifacts = {function_input_artifacts}
if function_input_artifacts:
task.connect(function_input_artifacts, name='kwargs_artifacts')
for k, v in function_input_artifacts.items():
if not v:
continue
task_id, artifact_name = v.split('.', 1)
kwargs[k] = Task.get_task(task_id=task_id).artifact[artifact_name].get()
results = {function_name}(**kwargs)
result_names = {function_results}
if results and result_names:
for name, artifact in zip(results, result_names):
task.upload_artifact(name=name, artifact_object=artifact)
""".format(
function_source=function_source,
function_kwargs=function_kwargs,
function_input_artifacts=function_input_artifacts,
function_name=function_name,
function_results=function_results)
with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file:
temp_file.write(task_template)
temp_file.flush()
populate = CreateAndPopulate(
project_name=project_name,
task_name=task_name,
task_type=task_type,
script=temp_file.name,
packages=packages if packages is not None else True,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
)
task = populate.create_task()
task.update_task(task_data={'script': {'entry_point': '{}.py'.format(function_name)}})
return task

View File

@ -32,7 +32,7 @@ class ScriptRequirements(object):
def __init__(self, root_folder):
self._root_folder = root_folder
def get_requirements(self, entry_point_filename=None):
def get_requirements(self, entry_point_filename=None, add_missing_installed_packages=False):
# noinspection PyBroadException
try:
from ....utilities.pigar.reqs import get_installed_pkgs_detail
@ -44,6 +44,10 @@ class ScriptRequirements(object):
'site-packages', 'dist-packages'])
reqs, try_imports, guess, local_pks = gr.extract_reqs(
module_callback=ScriptRequirements.add_trains_used_packages, entry_point_filename=entry_point_filename)
if add_missing_installed_packages and guess:
for k in guess:
if k not in reqs:
reqs[k] = guess[k]
return self.create_requirements_txt(reqs, local_pks)
except Exception as ex:
_logger.warning("Failed auto-generating package requirements: {}".format(ex))
@ -177,13 +181,13 @@ class ScriptRequirements(object):
for k, v in reqs.sorted_items():
if k in ignored_packages or k.lower() in ignored_packages:
continue
version = v.version
version = v.version if v else None
if k in forced_packages:
forced_version = forced_packages.pop(k, None)
if forced_version is not None:
version = forced_version
# requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()])
requirements_txt += ScriptRequirements._make_req_line(k, version)
requirements_txt += ScriptRequirements._make_req_line(k, version or None)
# add forced requirements that we could not find installed on the system
for k in sorted(forced_packages.keys()):
@ -204,6 +208,8 @@ class ScriptRequirements(object):
requirements_txt += ''.join(['# {0}\n'.format(c) for c in v.comments.sorted_items()])
for k, v in reqs.sorted_items():
if not v:
continue
requirements_txt += '\n'
if k == '-e':
requirements_txt += '# IMPORT PACKAGE {0} {1}\n'.format(k, v.version)
@ -704,7 +710,7 @@ class ScriptInfo(object):
@classmethod
def _get_script_info(
cls, filepaths, check_uncommitted=True, create_requirements=True, log=None,
uncommitted_from_remote=False, detect_jupyter_notebook=True):
uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False):
jupyter_filepath = cls._get_jupyter_notebook_filename() if detect_jupyter_notebook else None
if jupyter_filepath:
scripts_path = [Path(os.path.normpath(jupyter_filepath)).absolute()]
@ -786,7 +792,10 @@ class ScriptInfo(object):
script_requirements = ScriptRequirements(
Path(repo_root).as_posix() if repo_info.url else script_path.as_posix())
if create_requirements:
requirements, conda_requirements = script_requirements.get_requirements()
requirements, conda_requirements = script_requirements.get_requirements(
entry_point_filename=script_path.as_posix()
if not repo_info.url and script_path.is_file() else None,
add_missing_installed_packages=add_missing_installed_packages)
else:
script_requirements = None
@ -818,7 +827,7 @@ class ScriptInfo(object):
@classmethod
def get(cls, filepaths=None, check_uncommitted=True, create_requirements=True, log=None,
uncommitted_from_remote=False, detect_jupyter_notebook=True):
uncommitted_from_remote=False, detect_jupyter_notebook=True, add_missing_installed_packages=False):
try:
if not filepaths:
filepaths = [sys.argv[0], ]
@ -828,6 +837,7 @@ class ScriptInfo(object):
create_requirements=create_requirements, log=log,
uncommitted_from_remote=uncommitted_from_remote,
detect_jupyter_notebook=detect_jupyter_notebook,
add_missing_installed_packages=add_missing_installed_packages,
)
except SystemExit:
pass