Add TriggerScheduler example and fix pep8

This commit is contained in:
allegroai 2021-08-25 16:36:14 +03:00
parent 4333eed965
commit 8f48d5c884
4 changed files with 176 additions and 100 deletions

View File

@ -3,7 +3,8 @@ from .optimization import GridSearch, RandomSearch, HyperParameterOptimizer, Obj
from .job import ClearmlJob from .job import ClearmlJob
from .controller import PipelineController from .controller import PipelineController
from .scheduler import TaskScheduler from .scheduler import TaskScheduler
from .trigger import TriggerScheduler
__all__ = ["UniformParameterRange", "DiscreteParameterRange", "UniformIntegerParameterRange", "ParameterSet", __all__ = ["UniformParameterRange", "DiscreteParameterRange", "UniformIntegerParameterRange", "ParameterSet",
"GridSearch", "RandomSearch", "HyperParameterOptimizer", "Objective", "ClearmlJob", "PipelineController", "GridSearch", "RandomSearch", "HyperParameterOptimizer", "Objective", "ClearmlJob", "PipelineController",
"TaskScheduler"] "TaskScheduler", "TriggerScheduler"]

View File

@ -3,7 +3,7 @@ import logging
from datetime import datetime from datetime import datetime
from threading import Thread, enumerate as enumerate_threads from threading import Thread, enumerate as enumerate_threads
from time import sleep, time from time import sleep, time
from typing import List, Union, Optional, Callable, Sequence from typing import List, Union, Optional, Callable, Sequence, Dict
from attr import attrs, attrib from attr import attrs, attrib
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
@ -31,6 +31,7 @@ class BaseScheduleJob(object):
if not callable(v) and (full or not str(k).startswith('_'))} if not callable(v) and (full or not str(k).startswith('_'))}
def update(self, a_job): def update(self, a_job):
# type: (Union[Dict, BaseScheduleJob]) -> BaseScheduleJob
converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])} converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])}
for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items(): for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items():
if v is not None and not callable(getattr(self, k, v)): if v is not None and not callable(getattr(self, k, v)):
@ -38,7 +39,7 @@ class BaseScheduleJob(object):
return self return self
def verify(self): def verify(self):
# type () -> None # type: () -> None
if self.base_function and not self.name: if self.base_function and not self.name:
raise ValueError("Entry 'name' must be supplied for function scheduling") raise ValueError("Entry 'name' must be supplied for function scheduling")
if self.base_task_id and not self.queue: if self.base_task_id and not self.queue:
@ -47,11 +48,11 @@ class BaseScheduleJob(object):
raise ValueError("Either schedule function or task-id must be provided") raise ValueError("Either schedule function or task-id must be provided")
def get_last_executed_task_id(self): def get_last_executed_task_id(self):
# type () -> Optional[str] # type: () -> Optional[str]
return self._executed_instances[-1] if self._executed_instances else None return self._executed_instances[-1] if self._executed_instances else None
def run(self, task_id): def run(self, task_id):
# type (Optional[str]) -> datetime # type: (Optional[str]) -> None
if task_id: if task_id:
# make sure we have a new instance # make sure we have a new instance
if not self._executed_instances: if not self._executed_instances:
@ -77,23 +78,42 @@ class ScheduleJob(BaseScheduleJob):
_last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None) _last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
def verify(self): def verify(self):
# type () -> None # type: () -> None
def check_integer(value):
try:
return False if not isinstance(value, (int, float)) or \
int(value) != float(value) else True
except (TypeError, ValueError):
return False
super(ScheduleJob, self).verify() super(ScheduleJob, self).verify()
if self.weekdays and self.day not in (None, 0, 1): if self.weekdays and self.day not in (None, 0, 1):
raise ValueError("`weekdays` and `day` combination is not valid (day must be None,0 or 1)") raise ValueError("`weekdays` and `day` combination is not valid (day must be None,0 or 1)")
if self.weekdays and any(w not in self._weekdays_ind for w in self.weekdays):
raise ValueError("`weekdays` must be a list of strings, valid values are: {}".format(self._weekdays_ind))
if not (self.minute or self.hour or self.day or self.month or self.year): if not (self.minute or self.hour or self.day or self.month or self.year):
raise ValueError("Schedule time/date was not provided") raise ValueError("Schedule time/date was not provided")
if self.minute and not check_integer(self.minute):
raise ValueError("Schedule `minute` must be an integer")
if self.hour and not check_integer(self.hour):
raise ValueError("Schedule `hour` must be an integer")
if self.day and not check_integer(self.day):
raise ValueError("Schedule `day` must be an integer")
if self.month and not check_integer(self.month):
raise ValueError("Schedule `month` must be an integer")
if self.year and not check_integer(self.year):
raise ValueError("Schedule `year` must be an integer")
def next_run(self): def next_run(self):
# type () -> Optional[datetime] # type: () -> Optional[datetime]
return self._next_run return self._next_run
def get_execution_timeout(self): def get_execution_timeout(self):
# type () -> Optional[datetime] # type: () -> Optional[datetime]
return self._execution_timeout return self._execution_timeout
def next(self): def next(self):
# type () -> Optional[datetime] # type: () -> Optional[datetime]
""" """
:return: Return the next run datetime, None if no scheduling needed :return: Return the next run datetime, None if no scheduling needed
""" """
@ -211,7 +231,7 @@ class ScheduleJob(BaseScheduleJob):
return self._next_run return self._next_run
def run(self, task_id): def run(self, task_id):
# type (Optional[str]) -> datetime # type: (Optional[str]) -> datetime
super(ScheduleJob, self).run(task_id) super(ScheduleJob, self).run(task_id)
self._last_executed = datetime.utcnow() self._last_executed = datetime.utcnow()
if self.execution_limit_hours and task_id: if self.execution_limit_hours and task_id:
@ -484,25 +504,26 @@ class TaskScheduler(BaseScheduler):
def add_task( def add_task(
self, self,
schedule_task_id=None, # type(Union[str, Task]) schedule_task_id=None, # type: Union[str, Task]
schedule_function=None, # type(Callable) schedule_function=None, # type: Callable
queue=None, # type(str) queue=None, # type: str
name=None, # type(Optional[str]) name=None, # type: Optional[str]
target_project=None, # type(Optional[str]) target_project=None, # type: Optional[str]
minute=None, # type(Optional[int]) minute=None, # type: Optional[int]
hour=None, # type(Optional[int]) hour=None, # type: Optional[int]
day=None, # type(Optional[int]) day=None, # type: Optional[int]
weekdays=None, # type(Optional[List[str]]) weekdays=None, # type: Optional[List[str]]
month=None, # type(Optional[int]) month=None, # type: Optional[int]
year=None, # type(Optional[int]) year=None, # type: Optional[int]
limit_execution_time=None, # type(Optional[float]) limit_execution_time=None, # type: Optional[float]
single_instance=False, # type(bool) single_instance=False, # type: bool
recurring=True, # type(bool) recurring=True, # type: bool
reuse_task=False, # type(bool) execute_immediately=True, # type: bool
task_parameters=None, # type(Optional[dict]) reuse_task=False, # type: bool
task_overrides=None, # type(Optional[dict]) task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict]
): ):
# type(...) -> bool # type: (...) -> bool
""" """
Create a cron job alike scheduling for a pre existing Task. Create a cron job alike scheduling for a pre existing Task.
Notice it is recommended to give the schedule entry a descriptive unique name, Notice it is recommended to give the schedule entry a descriptive unique name,
@ -545,6 +566,8 @@ class TaskScheduler(BaseScheduler):
:param single_instance: If True, do not launch the Task job if the previous instance is still running :param single_instance: If True, do not launch the Task job if the previous instance is still running
(skip until the next scheduled time period). Default False. (skip until the next scheduled time period). Default False.
:param recurring: If False only launch the Task once (default: True, repeat) :param recurring: If False only launch the Task once (default: True, repeat)
:param execute_immediately: If True, schedule the Task to be execute immediately
then recurring based on the timing schedule arguments
:param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False. :param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False.
:param task_parameters: Configuration parameters to the executed Task. :param task_parameters: Configuration parameters to the executed Task.
for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/ for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/
@ -569,7 +592,7 @@ class TaskScheduler(BaseScheduler):
task_parameters=task_parameters, task_parameters=task_parameters,
task_overrides=task_overrides, task_overrides=task_overrides,
clone_task=not bool(reuse_task), clone_task=not bool(reuse_task),
starting_time=datetime.utcnow(), starting_time=datetime.fromtimestamp(0) if execute_immediately else datetime.utcnow(),
minute=minute, minute=minute,
hour=hour, hour=hour,
day=day, day=day,
@ -746,7 +769,7 @@ class TaskScheduler(BaseScheduler):
@staticmethod @staticmethod
def __deserialize_scheduled_jobs(serialized_jobs_dicts, current_jobs): def __deserialize_scheduled_jobs(serialized_jobs_dicts, current_jobs):
# type(List[Dict], List[ScheduleJob]) -> List[ScheduleJob] # type: (List[Dict], List[ScheduleJob]) -> List[ScheduleJob]
scheduled_jobs = [ScheduleJob().update(j) for j in serialized_jobs_dicts] scheduled_jobs = [ScheduleJob().update(j) for j in serialized_jobs_dicts]
scheduled_jobs = {j.name: j for j in scheduled_jobs} scheduled_jobs = {j.name: j for j in scheduled_jobs}
current_scheduled_jobs = {j.name: j for j in current_jobs} current_scheduled_jobs = {j.name: j for j in current_jobs}
@ -841,9 +864,9 @@ class TaskScheduler(BaseScheduler):
table_plot=executed_table table_plot=executed_table
) )
def _launch_job_task(self, job): def _launch_job_task(self, job, task_parameters=None, add_tags=None):
# type: (ScheduleJob) -> None # type: (ScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob]
task_job = super(TaskScheduler, self)._launch_job_task(job) task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags)
# make sure this is not a function job # make sure this is not a function job
if task_job: if task_job:
self._executed_jobs.append(ExecutedJob( self._executed_jobs.append(ExecutedJob(
@ -852,12 +875,15 @@ class TaskScheduler(BaseScheduler):
if job.get_execution_timeout(): if job.get_execution_timeout():
# we should probably make sure we are not overwriting a Task # we should probably make sure we are not overwriting a Task
self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id() self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id()
return task_job
def _launch_job_function(self, job): def _launch_job_function(self, job, func_args=None):
# type: (ScheduleJob) -> None # type: (ScheduleJob, Optional[Sequence]) -> Optional[Thread]
thread_job = super(TaskScheduler, self)._launch_job_function(job) thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args)
# make sure this is not a function job # make sure this is not a function job
if thread_job: if thread_job:
self._executed_jobs.append(ExecutedJob( self._executed_jobs.append(ExecutedJob(
name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow())) name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow()))
# execution timeout is not supported with function callbacks. # execution timeout is not supported with function callbacks.
return thread_job

View File

@ -2,7 +2,7 @@ import json
import logging import logging
from datetime import datetime from datetime import datetime
from threading import enumerate as enumerate_threads from threading import enumerate as enumerate_threads
from typing import List, Optional, Dict from typing import List, Optional, Dict, Union, Callable
from attr import attrs, attrib from attr import attrs, attrib
@ -40,7 +40,7 @@ class BaseTrigger(BaseScheduleJob):
} }
def verify(self): def verify(self):
# type () -> None # type: () -> None
super(BaseTrigger, self).verify() super(BaseTrigger, self).verify()
if self.tags and (not isinstance(self.tags, (list, tuple)) or if self.tags and (not isinstance(self.tags, (list, tuple)) or
not all(isinstance(s, str) for s in self.tags)): not all(isinstance(s, str) for s in self.tags)):
@ -109,7 +109,7 @@ class TaskTrigger(BaseTrigger):
threshold = attrib(default=None, type=float) threshold = attrib(default=None, type=float)
value_sign = attrib(default=None, type=str) value_sign = attrib(default=None, type=str)
exclude_dev = attrib(default=None, type=bool) exclude_dev = attrib(default=None, type=bool)
on_status = attrib(type=str, default=None) on_status = attrib(type=list, default=None)
def build_query(self, ref_time): def build_query(self, ref_time):
query = super(TaskTrigger, self).build_query(ref_time) query = super(TaskTrigger, self).build_query(ref_time)
@ -127,7 +127,7 @@ class TaskTrigger(BaseTrigger):
return query return query
def verify(self): def verify(self):
# type () -> None # type: () -> None
super(TaskTrigger, self).verify() super(TaskTrigger, self).verify()
if (self.metrics or self.variant or self.threshold is not None) and \ if (self.metrics or self.variant or self.threshold is not None) and \
not (self.metrics and self.variant and self.threshold is not None): not (self.metrics and self.variant and self.threshold is not None):
@ -192,24 +192,24 @@ class TriggerScheduler(BaseScheduler):
def add_model_trigger( def add_model_trigger(
self, self,
schedule_task_id=None, # type(Union[str, Task]) schedule_task_id=None, # type: Union[str, Task]
schedule_queue=None, # type(str) schedule_queue=None, # type: str
schedule_function=None, # type(Callable) schedule_function=None, # type: Callable[[str], None]
trigger_project=None, # type(str) trigger_project=None, # type: str
trigger_name=None, # type(Optional[str]) trigger_name=None, # type: Optional[str]
trigger_on_publish=None, # type(bool) trigger_on_publish=None, # type: bool
trigger_on_tags=None, # type(Optional[List[str]]) trigger_on_tags=None, # type: Optional[List[str]]
trigger_on_archive=None, # type(bool) trigger_on_archive=None, # type: bool
trigger_required_tags=None, # type(Optional[List[str]]) trigger_required_tags=None, # type: Optional[List[str]]
name=None, # type(Optional[str]) name=None, # type: Optional[str]
target_project=None, # type(Optional[str]) target_project=None, # type: Optional[str]
add_tag=True, # type(Union[bool, str]) add_tag=True, # type: Union[bool, str]
single_instance=False, # type(bool) single_instance=False, # type: bool
reuse_task=False, # type(bool) reuse_task=False, # type: bool
task_parameters=None, # type(Optional[dict]) task_parameters=None, # type: Optional[dict]
task_overrides=None, # type(Optional[dict]) task_overrides=None, # type: Optional[dict]
): ):
# type(...) -> bool # type: (...) -> None
""" """
Create a cron job alike scheduling for a pre existing Task or function. Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the model repository Trigger the Task/function execution on changes in the model repository
@ -270,24 +270,24 @@ class TriggerScheduler(BaseScheduler):
def add_dataset_trigger( def add_dataset_trigger(
self, self,
schedule_task_id=None, # type(Union[str, Task]) schedule_task_id=None, # type: Union[str, Task]
schedule_queue=None, # type(str) schedule_queue=None, # type: str
schedule_function=None, # type(Callable) schedule_function=None, # type: Callable[[str], None]
trigger_project=None, # type(str) trigger_project=None, # type: str
trigger_name=None, # type(Optional[str]) trigger_name=None, # type: Optional[str]
trigger_on_publish=None, # type(bool) trigger_on_publish=None, # type: bool
trigger_on_tags=None, # type(Optional[List[str]]) trigger_on_tags=None, # type: Optional[List[str]]
trigger_on_archive=None, # type(bool) trigger_on_archive=None, # type: bool
trigger_required_tags=None, # type(Optional[List[str]]) trigger_required_tags=None, # type: Optional[List[str]]
name=None, # type(Optional[str]) name=None, # type: Optional[str]
target_project=None, # type(Optional[str]) target_project=None, # type: Optional[str]
add_tag=True, # type(Union[bool, str]) add_tag=True, # type: Union[bool, str]
single_instance=False, # type(bool) single_instance=False, # type: bool
reuse_task=False, # type(bool) reuse_task=False, # type: bool
task_parameters=None, # type(Optional[dict]) task_parameters=None, # type: Optional[dict]
task_overrides=None, # type(Optional[dict]) task_overrides=None, # type: Optional[dict]
): ):
# type(...) -> bool # type: (...) -> None
""" """
Create a cron job alike scheduling for a pre existing Task or function. Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the dataset repository (notice this is not the hyper-datasets) Trigger the Task/function execution on changes in the dataset repository (notice this is not the hyper-datasets)
@ -349,28 +349,28 @@ class TriggerScheduler(BaseScheduler):
def add_task_trigger( def add_task_trigger(
self, self,
schedule_task_id=None, # type(Union[str, Task]) schedule_task_id=None, # type: Union[str, Task]
schedule_queue=None, # type(str) schedule_queue=None, # type: str
schedule_function=None, # type(Callable) schedule_function=None, # type: Callable[[str], None]
trigger_project=None, # type(str) trigger_project=None, # type: str
trigger_name=None, # type(Optional[str]) trigger_name=None, # type: Optional[str]
trigger_on_tags=None, # type(Optional[List[str]]) trigger_on_tags=None, # type: Optional[List[str]]
trigger_on_status=None, # type(Optional[List[str]]) trigger_on_status=None, # type: Optional[List[str]]
trigger_exclude_dev_tasks=None, # type(Optional[bool]) trigger_exclude_dev_tasks=None, # type: Optional[bool]
trigger_on_metric=None, # type(Optional[str]) trigger_on_metric=None, # type: Optional[str]
trigger_on_variant=None, # type(Optional[str]) trigger_on_variant=None, # type: Optional[str]
trigger_on_threshold=None, # type(Optional[str]) trigger_on_threshold=None, # type: Optional[float]
trigger_on_sign=None, # type(Optional[str]) trigger_on_sign=None, # type: Optional[str]
trigger_required_tags=None, # type(Optional[List[str]]) trigger_required_tags=None, # type: Optional[List[str]]
name=None, # type(Optional[str]) name=None, # type: Optional[str]
target_project=None, # type(Optional[str]) target_project=None, # type: Optional[str]
add_tag=True, # type(Union[bool, str]) add_tag=True, # type: Union[bool, str]
single_instance=False, # type(bool) single_instance=False, # type: bool
reuse_task=False, # type(bool) reuse_task=False, # type: bool
task_parameters=None, # type(Optional[dict]) task_parameters=None, # type: Optional[dict]
task_overrides=None, # type(Optional[dict]) task_overrides=None, # type: Optional[dict]
): ):
# type(...) -> bool # type: (...) -> None
""" """
Create a cron job alike scheduling for a pre existing Task or function. Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the Task Trigger the Task/function execution on changes in the Task
@ -570,8 +570,8 @@ class TriggerScheduler(BaseScheduler):
@staticmethod @staticmethod
def __deserialize_triggers(trigger_jobs, trigger_class, current_triggers): def __deserialize_triggers(trigger_jobs, trigger_class, current_triggers):
# type (List[dict], TriggerCLass, List[BaseTrigger]) -> List[BaseTrigger] # type: (List[dict], BaseTrigger, List[BaseTrigger]) -> List[BaseTrigger]
trigger_jobs = [trigger_class().update(j) for j in trigger_jobs] trigger_jobs = [trigger_class().update(j) for j in trigger_jobs] # noqa
trigger_jobs = {j.name: j for j in trigger_jobs} trigger_jobs = {j.name: j for j in trigger_jobs}
current_triggers = {j.name: j for j in current_triggers} current_triggers = {j.name: j for j in current_triggers}
@ -617,17 +617,17 @@ class TriggerScheduler(BaseScheduler):
state_dict = json.loads(state_json_str) state_dict = json.loads(state_json_str)
self._dataset_triggers = self.__deserialize_triggers( self._dataset_triggers = self.__deserialize_triggers(
state_dict.get('dataset_triggers', []), state_dict.get('dataset_triggers', []),
trigger_class=DatasetTrigger, trigger_class=DatasetTrigger, # noqa
current_triggers=self._dataset_triggers current_triggers=self._dataset_triggers
) )
self._model_triggers = self.__deserialize_triggers( self._model_triggers = self.__deserialize_triggers(
state_dict.get('model_triggers', []), state_dict.get('model_triggers', []),
trigger_class=ModelTrigger, trigger_class=ModelTrigger, # noqa
current_triggers=self._model_triggers current_triggers=self._model_triggers
) )
self._task_triggers = self.__deserialize_triggers( self._task_triggers = self.__deserialize_triggers(
state_dict.get('task_triggers', []), state_dict.get('task_triggers', []),
trigger_class=TaskTrigger, trigger_class=TaskTrigger, # noqa
current_triggers=self._task_triggers current_triggers=self._task_triggers
) )

View File

@ -0,0 +1,49 @@
from clearml import Task, Dataset, Model
from clearml.automation import TriggerScheduler
def trigger_model_func(model_id):
model = Model(model_id=model_id)
print('model id {} modified'.format(model.id))
def trigger_dataset_func(dataset_id):
dataset = Dataset.get(dataset_id=dataset_id)
print('dataset id {} created'.format(dataset.id))
def trigger_task_func(task_id):
task = Task.get_task(task_id=task_id)
print('Task ID {} metric above threshold'.format(task.id))
if __name__ == '__main__':
trigger = TriggerScheduler(pooling_frequency_minutes=3.0)
trigger.add_model_trigger(
name='model deploy',
schedule_function=trigger_model_func,
trigger_project='examples',
trigger_on_tags=['deploy']
)
trigger.add_model_trigger(
name='model quality check',
schedule_task_id='add_task_id_here',
schedule_queue='default',
trigger_project='examples',
trigger_on_tags=['deploy']
)
trigger.add_dataset_trigger(
name='retrain on dataset',
schedule_function=trigger_dataset_func,
trigger_project='datasets',
trigger_on_tags=['retrain']
)
trigger.add_task_trigger(
name='performance high-score',
schedule_function=trigger_task_func,
trigger_project='examples',
trigger_on_metric='epoch_accuracy', trigger_on_variant='epoch_accuracy',
trigger_on_sign='max',
trigger_on_threshold=0.99
)
trigger.start_remotely()