clearml/trains/automation/job.py

384 lines
12 KiB
Python
Raw Normal View History

import hashlib
from datetime import datetime
from logging import getLogger
from time import time, sleep
2020-05-24 05:16:12 +00:00
from typing import Optional, Mapping, Sequence, Any
from ..task import Task
from ..backend_api.services import tasks as tasks_service
2020-05-24 05:16:12 +00:00
logger = getLogger('trains.automation.job')
class TrainsJob(object):
2020-05-24 05:16:12 +00:00
def __init__(
self,
base_task_id, # type: str
parameter_override=None, # type: Optional[Mapping[str, str]]
task_overrides=None, # type: Optional[Mapping[str, str]]
tags=None, # type: Optional[Sequence[str]]
parent=None, # type: Optional[str]
**kwargs # type: Any
):
# type: (...) -> ()
"""
Create a new Task based in a base_task_id with a different set of parameters
:param str base_task_id: base task id to clone from
:param dict parameter_override: dictionary of parameters and values to set fo the cloned task
:param dict task_overrides: Task object specific overrides
:param list tags: additional tags to add to the newly cloned task
:param str parent: Set newly created Task parent task field, default: base_tak_id.
:param dict kwargs: additional Task creation parameters
"""
self.task = Task.clone(base_task_id, parent=parent or base_task_id, **kwargs)
if tags:
self.task.set_tags(list(set(self.task.get_tags()) | set(tags)))
self.task_parameter_override = None
if parameter_override:
params = self.task.get_parameters(backwards_compatibility=False)
params.update(parameter_override)
self.task.set_parameters(params)
self.task_parameter_override = dict(**parameter_override)
if task_overrides:
# todo: make sure it works
# noinspection PyProtectedMember
2020-05-24 05:16:12 +00:00
self.task._edit(**task_overrides)
self.task_started = False
self._worker = None
def get_metric(self, title, series):
2020-05-24 05:16:12 +00:00
# type: (str, str) -> (float, float, float)
"""
Retrieve a specific scalar metric from the running Task.
:param str title: Graph title (metric)
:param str series: Series on the specific graph (variant)
2020-06-11 18:15:40 +00:00
:return: A tuple of min value, max value, last value
"""
metrics, title, series, values = self.get_metric_req_params(title, series)
res = self.task.send(
tasks_service.GetAllRequest(
id=[self.task.id],
page=0,
page_size=1,
only_fields=['id', ] + metrics
)
)
response = res.wait()
return tuple(response.response_data['tasks'][0]['last_metrics'][title][series][v] for v in values)
@staticmethod
def get_metric_req_params(title, series):
title = hashlib.md5(str(title).encode('utf-8')).hexdigest()
series = hashlib.md5(str(series).encode('utf-8')).hexdigest()
metric = 'last_metrics.{}.{}.'.format(title, series)
values = ['min_value', 'max_value', 'value']
metrics = [metric + v for v in values]
return metrics, title, series, values
def launch(self, queue_name=None):
2020-05-24 05:16:12 +00:00
# type: (str) -> ()
"""
Send Job for execution on the requested execution queue
:param str queue_name:
"""
try:
Task.enqueue(task=self.task, queue_name=queue_name)
except Exception as ex:
logger.warning(ex)
def abort(self):
2020-05-24 05:16:12 +00:00
# type: () -> ()
"""
Abort currently running job (can be called multiple times)
"""
try:
self.task.stopped()
except Exception as ex:
logger.warning(ex)
def elapsed(self):
2020-05-24 05:16:12 +00:00
# type: () -> float
"""
Return the time in seconds since job started. Return -1 if job is still pending
2020-06-11 18:15:40 +00:00
:return: Seconds from start.
"""
if not self.task_started and str(self.task.status) != Task.TaskStatusEnum.in_progress:
return -1
self.task_started = True
2020-05-31 09:07:12 +00:00
if not self.task.data.started:
self.task.reload()
if not self.task.data.started:
return -1
2020-05-31 09:07:12 +00:00
return (datetime.now(tz=self.task.data.started.tzinfo) - self.task.data.started).total_seconds()
def iterations(self):
2020-05-24 05:16:12 +00:00
# type: () -> int
"""
Return the last iteration value of the current job. -1 if job has not started yet
2020-06-11 18:15:40 +00:00
:return: Task last iteration.
"""
if not self.task_started and self.task.status != Task.TaskStatusEnum.in_progress:
return -1
self.task_started = True
return self.task.get_last_iteration()
def task_id(self):
2020-05-24 05:16:12 +00:00
# type: () -> str
"""
Return the Task id.
2020-06-11 18:15:40 +00:00
:return: The Task ID.
"""
return self.task.id
def status(self):
# type: () -> str
"""
Return the Job Task current status, see Task.TaskStatusEnum
2020-06-11 18:15:40 +00:00
:return: Task status Task.TaskStatusEnum in string.
"""
return self.task.status
def wait(self, timeout=None, pool_period=30.):
2020-05-24 05:16:12 +00:00
# type: (Optional[float], float) -> bool
"""
2020-06-11 18:15:40 +00:00
Wait until the task is fully executed (i.e., aborted/completed/failed)
:param timeout: maximum time (minutes) to wait for Task to finish
:param pool_period: check task status every pool_period seconds
2020-06-11 18:15:40 +00:00
:return: True, if Task finished.
"""
tic = time()
2020-05-24 05:16:12 +00:00
while timeout is None or time() - tic < timeout * 60.:
if self.is_stopped():
return True
sleep(pool_period)
return self.is_stopped()
def get_console_output(self, number_of_reports=1):
2020-05-24 05:16:12 +00:00
# type: (int) -> Sequence[str]
"""
Return a list of console outputs reported by the Task.
Returned console outputs are retrieved from the most updated console outputs.
:param int number_of_reports: number of reports to return, default 1, the last (most updated) console output
2020-06-11 18:15:40 +00:00
:return: List of strings each entry corresponds to one report.
"""
return self.task.get_reported_console_output(number_of_reports=number_of_reports)
def worker(self):
2020-05-24 05:16:12 +00:00
# type: () -> str
"""
Return the current worker id executing this Job. If job is pending, returns None
2020-06-11 18:15:40 +00:00
:return: ID of the worker executing / executed the job, or None if job is still pending.
"""
if self.is_pending():
return self._worker
if self._worker is None:
# the last console outputs will update the worker
self.get_console_output(number_of_reports=1)
# if we still do not have it, store empty string
if not self._worker:
self._worker = ''
return self._worker
def is_running(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
"""
2020-06-11 18:15:40 +00:00
Return True, if job is currently running (pending is considered False)
2020-06-11 18:15:40 +00:00
:return: True, if the task is currently in progress.
"""
return self.task.status == Task.TaskStatusEnum.in_progress
def is_stopped(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
"""
2020-06-11 18:15:40 +00:00
Return True, if job is has executed and is not any more
:return: True the task is currently one of these states, stopped / completed / failed / published.
"""
return self.task.status in (
Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published)
def is_failed(self):
# type: () -> bool
"""
Return True, if job is has executed and failed
:return: True the task is currently in failed state
"""
return self.task.status in (Task.TaskStatusEnum.failed, )
def is_completed(self):
# type: () -> bool
"""
Return True, if job is has executed and completed successfully
:return: True the task is currently in completed or published state
"""
return self.task.status in (Task.TaskStatusEnum.completed, Task.TaskStatusEnum.published)
def is_aborted(self):
# type: () -> bool
"""
Return True, if job is has executed and aborted
:return: True the task is currently in aborted state
"""
return self.task.status in (Task.TaskStatusEnum.stopped, )
def is_pending(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
"""
2020-06-11 18:15:40 +00:00
Return True, if job is waiting for execution
2020-06-11 18:15:40 +00:00
:return: True the task is currently is currently queued.
"""
return self.task.status in (Task.TaskStatusEnum.queued, Task.TaskStatusEnum.created)
def started(self):
# type: () -> bool
"""
2020-06-11 18:15:40 +00:00
Return True, if job already started, or ended. False, if created/pending.
2020-06-11 18:15:40 +00:00
:return: False, if the task is currently in draft mode or pending.
"""
if not self.task_started and self.task.status in (
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created):
return False
self.task_started = True
return True
2020-05-24 05:16:12 +00:00
# noinspection PyMethodMayBeStatic, PyUnusedLocal
class _JobStub(object):
"""
This is a Job Stub, use only for debugging
"""
2020-07-04 19:52:09 +00:00
2020-05-24 05:16:12 +00:00
def __init__(
self,
base_task_id, # type: str
parameter_override=None, # type: Optional[Mapping[str, str]]
task_overrides=None, # type: Optional[Mapping[str, str]]
tags=None, # type: Optional[Sequence[str]]
**kwargs # type: Any
):
# type: (...) -> ()
self.task = None
self.base_task_id = base_task_id
self.parameter_override = parameter_override
self.task_overrides = task_overrides
self.tags = tags
self.iteration = -1
self.task_started = None
def launch(self, queue_name=None):
2020-05-24 05:16:12 +00:00
# type: (str) -> ()
self.iteration = 0
self.task_started = time()
print('launching', self.parameter_override, 'in', queue_name)
def abort(self):
2020-05-24 05:16:12 +00:00
# type: () -> ()
self.task_started = -1
def elapsed(self):
2020-05-24 05:16:12 +00:00
# type: () -> float
"""
Return the time in seconds since job started. Return -1 if job is still pending
2020-06-11 18:15:40 +00:00
:return: Seconds from start.
"""
if self.task_started is None:
return -1
return time() - self.task_started
def iterations(self):
2020-05-24 05:16:12 +00:00
# type: () -> int
"""
Return the last iteration value of the current job. -1 if job has not started yet
2020-06-11 18:15:40 +00:00
:return: Task last iteration.
"""
if self.task_started is None:
return -1
return self.iteration
def get_metric(self, title, series):
2020-05-24 05:16:12 +00:00
# type: (str, str) -> (float, float, float)
"""
Retrieve a specific scalar metric from the running Task.
:param str title: Graph title (metric)
:param str series: Series on the specific graph (variant)
2020-06-11 18:15:40 +00:00
:return: min value, max value, last value
"""
return 0, 1.0, 0.123
def task_id(self):
2020-05-24 05:16:12 +00:00
# type: () -> str
return 'stub'
def worker(self):
2020-05-24 05:16:12 +00:00
# type: () -> ()
return None
def status(self):
2020-05-24 05:16:12 +00:00
# type: () -> str
return 'in_progress'
def wait(self, timeout=None, pool_period=30.):
2020-05-24 05:16:12 +00:00
# type: (Optional[float], float) -> bool
"""
2020-06-11 18:15:40 +00:00
Wait for the task to be processed (i.e., aborted/completed/failed)
:param timeout: maximum time (minutes) to wait for Task to finish
:param pool_period: check task status every pool_period seconds
2020-06-11 18:15:40 +00:00
:return: True, if the Task finished.
"""
return True
def get_console_output(self, number_of_reports=1):
2020-05-24 05:16:12 +00:00
# type: (int) -> Sequence[str]
"""
Return a list of console outputs reported by the Task.
Returned console outputs are retrieved from the most updated console outputs.
:param int number_of_reports: number of reports to return, default 1, the last (most updated) console output
2020-06-11 18:15:40 +00:00
:return: List of strings each entry corresponds to one report.
"""
return []
def is_running(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
return self.task_started is not None and self.task_started > 0
def is_stopped(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
return self.task_started is not None and self.task_started < 0
def is_pending(self):
2020-05-24 05:16:12 +00:00
# type: () -> bool
return self.task_started is None
def started(self):
# type: () -> bool
return not self.is_pending()