Add IDE to the task's runtime properties

Improve type annotations and docstrings
Flake8
This commit is contained in:
allegroai 2022-12-13 15:59:42 +02:00
parent 3cb66650e8
commit d3de2151e8
2 changed files with 99 additions and 9 deletions

View File

@ -271,6 +271,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
preview=diff_preview, preview=diff_preview,
) )
# add ide info into task runtime_properties
# noinspection PyBroadException
try:
self._set_runtime_properties(runtime_properties={"IDE": result.script["ide"]})
except Exception as ex:
self.log.info("Failed logging ide information: {}".format(ex))
# store original entry point # store original entry point
entry_point = result.script.get('entry_point') if result.script else None entry_point = result.script.get('entry_point') if result.script else None
@ -364,6 +371,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@property @property
def storage_uri(self): def storage_uri(self):
# type: () -> Optional[str] # type: () -> Optional[str]
"""
The storage / output url for this task. This is the default location for output models and other artifacts.
:return: The url string or None if not set.
"""
if self._storage_uri: if self._storage_uri:
return self._storage_uri return self._storage_uri
if running_remotely(): if running_remotely():
@ -374,41 +386,83 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@storage_uri.setter @storage_uri.setter
def storage_uri(self, value): def storage_uri(self, value):
# type: (str) -> () # type: (str) -> ()
"""
Set the storage / output url for this task. This is the default location for output models and other artifacts.
:param str value: The value to set for output URI.
"""
self._set_storage_uri(value) self._set_storage_uri(value)
@property @property
def task_id(self): def task_id(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's ID.
"""
return self.id return self.id
@property @property
def name(self): def name(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's name.
"""
return self.data.name or '' return self.data.name or ''
@name.setter @name.setter
def name(self, value): def name(self, value):
# type: (str) -> () # type: (str) -> ()
"""
Set the current Task's name.
:param str value: Name to set.
"""
self.set_name(value) self.set_name(value)
@property @property
def task_type(self): def task_type(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's type.
Valid task types:
- ``TaskTypes.training`` (default)
- ``TaskTypes.testing``
- ``TaskTypes.inference``
- ``TaskTypes.data_processing``
- ``TaskTypes.application``
- ``TaskTypes.monitor``
- ``TaskTypes.controller``
- ``TaskTypes.optimizer``
- ``TaskTypes.service``
- ``TaskTypes.qc``
- ``TaskTypes.custom``
"""
return self.data.type return self.data.type
@property @property
def project(self): def project(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's project name.
"""
return self.data.project return self.data.project
@property @property
def parent(self): def parent(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's parent task ID (str).
"""
return self.data.parent return self.data.parent
@property @property
def input_models_id(self): def input_models_id(self):
# type: () -> Mapping[str, str] # type: () -> Mapping[str, str]
"""
Returns the current Task's input model IDs as a dictionary.
"""
if not Session.check_min_api_version("2.13"): if not Session.check_min_api_version("2.13"):
model_id = self._get_task_property('execution.model', raise_on_error=False) model_id = self._get_task_property('execution.model', raise_on_error=False)
return {'Input Model': model_id} if model_id else {} return {'Input Model': model_id} if model_id else {}
@ -419,6 +473,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@property @property
def output_models_id(self): def output_models_id(self):
# type: () -> Mapping[str, str] # type: () -> Mapping[str, str]
"""
Returns the current Task's output model IDs as a dictionary.
"""
if not Session.check_min_api_version("2.13"): if not Session.check_min_api_version("2.13"):
model_id = self._get_task_property('output.model', raise_on_error=False) model_id = self._get_task_property('output.model', raise_on_error=False)
return {'Output Model': model_id} if model_id else {} return {'Output Model': model_id} if model_id else {}
@ -429,11 +486,19 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@property @property
def comment(self): def comment(self):
# type: () -> str # type: () -> str
"""
Returns the current Task's (user defined) comments.
"""
return self.data.comment or '' return self.data.comment or ''
@comment.setter @comment.setter
def comment(self, value): def comment(self, value):
# type: (str) -> () # type: (str) -> ()
"""
Set the comment of the task. Please note that this will override any comment currently
present. If you want to add lines to the comment field, get the comments first, add your
own and then set them again.
"""
self.set_comment(value) self.set_comment(value)
@property @property
@ -605,7 +670,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def mark_completed(self, ignore_errors=True, status_message=None, force=False): def mark_completed(self, ignore_errors=True, status_message=None, force=False):
# type: (bool, Optional[str], bool) -> () # type: (bool, Optional[str], bool) -> ()
""" """
Manually mark a Task as completed Manually mark a Task as completed. This will close the running process and will change the Task's status
to Completed (Use this function to close and change status of remotely executed tasks).
To simply change the Task's status to completed, use task.close()
:param bool ignore_errors: If True (default), ignore any errors raised :param bool ignore_errors: If True (default), ignore any errors raised
:param bool force: If True the task status will be changed to `stopped` regardless of the current Task state. :param bool force: If True the task status will be changed to `stopped` regardless of the current Task state.
@ -633,7 +700,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def publish(self, ignore_errors=True): def publish(self, ignore_errors=True):
# type: (bool) -> () # type: (bool) -> ()
""" The signal that this Task will be published """ """ The signal that this task will be published """
if str(self.status) not in (str(tasks.TaskStatusEnum.stopped), str(tasks.TaskStatusEnum.completed)): if str(self.status) not in (str(tasks.TaskStatusEnum.stopped), str(tasks.TaskStatusEnum.completed)):
raise ValueError("Can't publish, Task is not stopped") raise ValueError("Can't publish, Task is not stopped")
resp = self.send(tasks.PublishRequest(self.id), ignore_errors=ignore_errors) resp = self.send(tasks.PublishRequest(self.id), ignore_errors=ignore_errors)
@ -1493,6 +1560,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def set_project(self, project_id=None, project_name=None): def set_project(self, project_id=None, project_name=None):
# type: (Optional[str], Optional[str]) -> () # type: (Optional[str], Optional[str]) -> ()
"""
Set the project of the current task by either specifying a project name or ID
"""
# if running remotely and we are the main task, skip setting ourselves. # if running remotely and we are the main task, skip setting ourselves.
if self._is_remote_main_task(): if self._is_remote_main_task():
@ -1511,6 +1581,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def get_project_name(self): def get_project_name(self):
# type: () -> Optional[str] # type: () -> Optional[str]
"""
Get the current Task's project name.
"""
if self.project is None: if self.project is None:
return self._project_name[1] if self._project_name and len(self._project_name) > 1 else None return self._project_name[1] if self._project_name and len(self._project_name) > 1 else None
@ -1525,6 +1598,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def get_project_object(self): def get_project_object(self):
# type: () -> dict # type: () -> dict
""" Get the current Task's project as a python object. """
if self.project is None: if self.project is None:
return self._project_object[1] if self._project_object and len(self._project_object) > 1 else None return self._project_object[1] if self._project_object and len(self._project_object) > 1 else None
@ -1539,6 +1613,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def get_tags(self): def get_tags(self):
# type: () -> Sequence[str] # type: () -> Sequence[str]
""" Get all current Task's tags."""
return self._get_task_property("tags") return self._get_task_property("tags")
def set_system_tags(self, tags): def set_system_tags(self, tags):
@ -1558,6 +1633,11 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
def set_tags(self, tags): def set_tags(self, tags):
# type: (Sequence[str]) -> () # type: (Sequence[str]) -> ()
"""
Set the current Task's tags. Please note this will overwrite anything that is there already.
:param Sequence(str) tags: Any sequence of tags to set.
"""
assert isinstance(tags, (list, tuple)) assert isinstance(tags, (list, tuple))
if not Session.check_min_api_version('2.3'): if not Session.check_min_api_version('2.3'):
# not supported # not supported
@ -1949,7 +2029,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@classmethod @classmethod
def get_projects(cls, **kwargs): def get_projects(cls, **kwargs):
# type: () -> (List['projects.Project']) # type: (**Any) -> (List['projects.Project'])
""" """
Return a list of projects in the system, sorted by last updated time Return a list of projects in the system, sorted by last updated time
@ -2029,7 +2109,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
try: try:
import pkg_resources import pkg_resources
except ImportError: except ImportError:
get_logger("task").warning("Requirement file %s skipped since pkg_resources is not installed" % package_name) get_logger("task").warning(
"Requirement file %s skipped since pkg_resources is not installed" % package_name)
else: else:
with Path(package_name).open() as requirements_txt: with Path(package_name).open() as requirements_txt:
for req in pkg_resources.parse_requirements(requirements_txt): for req in pkg_resources.parse_requirements(requirements_txt):
@ -2223,7 +2304,8 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
# Since we ae using forced update, make sure he task status is valid # Since we ae using forced update, make sure he task status is valid
status = self._data.status if self._data and self._reload_skip_flag else self.data.status status = self._data.status if self._data and self._reload_skip_flag else self.data.status
if not kwargs.pop("force", False) and status not in (tasks.TaskStatusEnum.created, tasks.TaskStatusEnum.in_progress): if not kwargs.pop("force", False) and \
status not in (tasks.TaskStatusEnum.created, tasks.TaskStatusEnum.in_progress):
# the exception being name/comment that we can always change. # the exception being name/comment that we can always change.
if kwargs and all( if kwargs and all(
k in ("name", "project", "comment", "tags", "system_tags", "runtime") for k in kwargs.keys() k in ("name", "project", "comment", "tags", "system_tags", "runtime") for k in kwargs.keys()
@ -2269,8 +2351,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self.data.script = script self.data.script = script
self._edit(script=script) self._edit(script=script)
def _set_configuration(self, name, description=None, config_type=None, config_text=None, config_dict=None, **kwargs): def _set_configuration(self, name, description=None, config_type=None, config_text=None, config_dict=None,
# type: (str, Optional[str], Optional[str], Optional[str], Optional[Union[Mapping, list]]) -> None **kwargs):
# type: (str, Optional[str], Optional[str], Optional[str], Optional[Union[Mapping, list]], **Any) -> None
""" """
Set Task configuration text/dict. Multiple configurations are supported. Set Task configuration text/dict. Multiple configurations are supported.
@ -2463,6 +2546,13 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
@classmethod @classmethod
def get_by_name(cls, task_name): def get_by_name(cls, task_name):
# type: (str) -> Task # type: (str) -> Task
"""
Returns the most recent task with the given name from anywhere in the system as a Task object.
:param str task_name: The name of the task to search for.
:return: Task object of the most recent task with that name.
"""
res = cls._send(cls._get_default_session(), tasks.GetAllRequest(name=exact_match_regex(task_name))) res = cls._send(cls._get_default_session(), tasks.GetAllRequest(name=exact_match_regex(task_name)))
task = get_single_result(entity='task', query=task_name, results=res.response.tasks) task = get_single_result(entity='task', query=task_name, results=res.response.tasks)
@ -2481,7 +2571,6 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
page = -1 page = -1
page_size = 500 page_size = 500
all_responses = [] all_responses = []
res = None
while True: while True:
page += 1 page += 1
res = cls._send( res = cls._send(

View File

@ -1760,7 +1760,8 @@ class Task(_Task):
def close(self): def close(self):
""" """
Close the current Task. Enables you to manually shutdown the task. Closes the current Task and changes its status to completed.
Enables you to manually shutdown the task.
.. warning:: .. warning::
Only call :meth:`Task.close` if you are certain the Task is not needed. Only call :meth:`Task.close` if you are certain the Task is not needed.