diff --git a/clearml/automation/__init__.py b/clearml/automation/__init__.py index cd77a233..fb7b7196 100644 --- a/clearml/automation/__init__.py +++ b/clearml/automation/__init__.py @@ -3,7 +3,8 @@ from .optimization import GridSearch, RandomSearch, HyperParameterOptimizer, Obj from .job import ClearmlJob from .controller import PipelineController from .scheduler import TaskScheduler +from .trigger import TriggerScheduler __all__ = ["UniformParameterRange", "DiscreteParameterRange", "UniformIntegerParameterRange", "ParameterSet", "GridSearch", "RandomSearch", "HyperParameterOptimizer", "Objective", "ClearmlJob", "PipelineController", - "TaskScheduler"] + "TaskScheduler", "TriggerScheduler"] diff --git a/clearml/automation/scheduler.py b/clearml/automation/scheduler.py index 7251d8ee..870cc212 100644 --- a/clearml/automation/scheduler.py +++ b/clearml/automation/scheduler.py @@ -3,7 +3,7 @@ import logging from datetime import datetime from threading import Thread, enumerate as enumerate_threads from time import sleep, time -from typing import List, Union, Optional, Callable, Sequence +from typing import List, Union, Optional, Callable, Sequence, Dict from attr import attrs, attrib from dateutil.relativedelta import relativedelta @@ -31,6 +31,7 @@ class BaseScheduleJob(object): if not callable(v) and (full or not str(k).startswith('_'))} def update(self, a_job): + # type: (Union[Dict, BaseScheduleJob]) -> BaseScheduleJob converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])} for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items(): if v is not None and not callable(getattr(self, k, v)): @@ -38,7 +39,7 @@ class BaseScheduleJob(object): return self def verify(self): - # type () -> None + # type: () -> None if self.base_function and not self.name: raise ValueError("Entry 'name' must be supplied for function scheduling") if self.base_task_id and not self.queue: @@ -47,11 +48,11 @@ class BaseScheduleJob(object): raise ValueError("Either schedule function or task-id must be provided") def get_last_executed_task_id(self): - # type () -> Optional[str] + # type: () -> Optional[str] return self._executed_instances[-1] if self._executed_instances else None def run(self, task_id): - # type (Optional[str]) -> datetime + # type: (Optional[str]) -> None if task_id: # make sure we have a new instance if not self._executed_instances: @@ -77,23 +78,42 @@ class ScheduleJob(BaseScheduleJob): _last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None) def verify(self): - # type () -> None + # type: () -> None + def check_integer(value): + try: + return False if not isinstance(value, (int, float)) or \ + int(value) != float(value) else True + except (TypeError, ValueError): + return False + super(ScheduleJob, self).verify() if self.weekdays and self.day not in (None, 0, 1): raise ValueError("`weekdays` and `day` combination is not valid (day must be None,0 or 1)") + if self.weekdays and any(w not in self._weekdays_ind for w in self.weekdays): + raise ValueError("`weekdays` must be a list of strings, valid values are: {}".format(self._weekdays_ind)) if not (self.minute or self.hour or self.day or self.month or self.year): raise ValueError("Schedule time/date was not provided") + if self.minute and not check_integer(self.minute): + raise ValueError("Schedule `minute` must be an integer") + if self.hour and not check_integer(self.hour): + raise ValueError("Schedule `hour` must be an integer") + if self.day and not check_integer(self.day): + raise ValueError("Schedule `day` must be an integer") + if self.month and not check_integer(self.month): + raise ValueError("Schedule `month` must be an integer") + if self.year and not check_integer(self.year): + raise ValueError("Schedule `year` must be an integer") def next_run(self): - # type () -> Optional[datetime] + # type: () -> Optional[datetime] return self._next_run def get_execution_timeout(self): - # type () -> Optional[datetime] + # type: () -> Optional[datetime] return self._execution_timeout def next(self): - # type () -> Optional[datetime] + # type: () -> Optional[datetime] """ :return: Return the next run datetime, None if no scheduling needed """ @@ -211,7 +231,7 @@ class ScheduleJob(BaseScheduleJob): return self._next_run def run(self, task_id): - # type (Optional[str]) -> datetime + # type: (Optional[str]) -> datetime super(ScheduleJob, self).run(task_id) self._last_executed = datetime.utcnow() if self.execution_limit_hours and task_id: @@ -484,25 +504,26 @@ class TaskScheduler(BaseScheduler): def add_task( self, - schedule_task_id=None, # type(Union[str, Task]) - schedule_function=None, # type(Callable) - queue=None, # type(str) - name=None, # type(Optional[str]) - target_project=None, # type(Optional[str]) - minute=None, # type(Optional[int]) - hour=None, # type(Optional[int]) - day=None, # type(Optional[int]) - weekdays=None, # type(Optional[List[str]]) - month=None, # type(Optional[int]) - year=None, # type(Optional[int]) - limit_execution_time=None, # type(Optional[float]) - single_instance=False, # type(bool) - recurring=True, # type(bool) - reuse_task=False, # type(bool) - task_parameters=None, # type(Optional[dict]) - task_overrides=None, # type(Optional[dict]) + schedule_task_id=None, # type: Union[str, Task] + schedule_function=None, # type: Callable + queue=None, # type: str + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + minute=None, # type: Optional[int] + hour=None, # type: Optional[int] + day=None, # type: Optional[int] + weekdays=None, # type: Optional[List[str]] + month=None, # type: Optional[int] + year=None, # type: Optional[int] + limit_execution_time=None, # type: Optional[float] + single_instance=False, # type: bool + recurring=True, # type: bool + execute_immediately=True, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): - # type(...) -> bool + # type: (...) -> bool """ Create a cron job alike scheduling for a pre existing Task. Notice it is recommended to give the schedule entry a descriptive unique name, @@ -545,6 +566,8 @@ class TaskScheduler(BaseScheduler): :param single_instance: If True, do not launch the Task job if the previous instance is still running (skip until the next scheduled time period). Default False. :param recurring: If False only launch the Task once (default: True, repeat) + :param execute_immediately: If True, schedule the Task to be execute immediately + then recurring based on the timing schedule arguments :param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False. :param task_parameters: Configuration parameters to the executed Task. for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/ @@ -569,7 +592,7 @@ class TaskScheduler(BaseScheduler): task_parameters=task_parameters, task_overrides=task_overrides, clone_task=not bool(reuse_task), - starting_time=datetime.utcnow(), + starting_time=datetime.fromtimestamp(0) if execute_immediately else datetime.utcnow(), minute=minute, hour=hour, day=day, @@ -746,7 +769,7 @@ class TaskScheduler(BaseScheduler): @staticmethod def __deserialize_scheduled_jobs(serialized_jobs_dicts, current_jobs): - # type(List[Dict], List[ScheduleJob]) -> List[ScheduleJob] + # type: (List[Dict], List[ScheduleJob]) -> List[ScheduleJob] scheduled_jobs = [ScheduleJob().update(j) for j in serialized_jobs_dicts] scheduled_jobs = {j.name: j for j in scheduled_jobs} current_scheduled_jobs = {j.name: j for j in current_jobs} @@ -841,9 +864,9 @@ class TaskScheduler(BaseScheduler): table_plot=executed_table ) - def _launch_job_task(self, job): - # type: (ScheduleJob) -> None - task_job = super(TaskScheduler, self)._launch_job_task(job) + def _launch_job_task(self, job, task_parameters=None, add_tags=None): + # type: (ScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob] + task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags) # make sure this is not a function job if task_job: self._executed_jobs.append(ExecutedJob( @@ -852,12 +875,15 @@ class TaskScheduler(BaseScheduler): if job.get_execution_timeout(): # we should probably make sure we are not overwriting a Task self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id() + return task_job - def _launch_job_function(self, job): - # type: (ScheduleJob) -> None - thread_job = super(TaskScheduler, self)._launch_job_function(job) + def _launch_job_function(self, job, func_args=None): + # type: (ScheduleJob, Optional[Sequence]) -> Optional[Thread] + thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args) # make sure this is not a function job if thread_job: self._executed_jobs.append(ExecutedJob( name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow())) # execution timeout is not supported with function callbacks. + + return thread_job diff --git a/clearml/automation/trigger.py b/clearml/automation/trigger.py index 454b3679..b8ce3cb1 100644 --- a/clearml/automation/trigger.py +++ b/clearml/automation/trigger.py @@ -2,7 +2,7 @@ import json import logging from datetime import datetime from threading import enumerate as enumerate_threads -from typing import List, Optional, Dict +from typing import List, Optional, Dict, Union, Callable from attr import attrs, attrib @@ -40,7 +40,7 @@ class BaseTrigger(BaseScheduleJob): } def verify(self): - # type () -> None + # type: () -> None super(BaseTrigger, self).verify() if self.tags and (not isinstance(self.tags, (list, tuple)) or not all(isinstance(s, str) for s in self.tags)): @@ -109,7 +109,7 @@ class TaskTrigger(BaseTrigger): threshold = attrib(default=None, type=float) value_sign = attrib(default=None, type=str) exclude_dev = attrib(default=None, type=bool) - on_status = attrib(type=str, default=None) + on_status = attrib(type=list, default=None) def build_query(self, ref_time): query = super(TaskTrigger, self).build_query(ref_time) @@ -127,7 +127,7 @@ class TaskTrigger(BaseTrigger): return query def verify(self): - # type () -> None + # type: () -> None super(TaskTrigger, self).verify() if (self.metrics or self.variant or self.threshold is not None) and \ not (self.metrics and self.variant and self.threshold is not None): @@ -192,24 +192,24 @@ class TriggerScheduler(BaseScheduler): def add_model_trigger( self, - schedule_task_id=None, # type(Union[str, Task]) - schedule_queue=None, # type(str) - schedule_function=None, # type(Callable) - trigger_project=None, # type(str) - trigger_name=None, # type(Optional[str]) - trigger_on_publish=None, # type(bool) - trigger_on_tags=None, # type(Optional[List[str]]) - trigger_on_archive=None, # type(bool) - trigger_required_tags=None, # type(Optional[List[str]]) - name=None, # type(Optional[str]) - target_project=None, # type(Optional[str]) - add_tag=True, # type(Union[bool, str]) - single_instance=False, # type(bool) - reuse_task=False, # type(bool) - task_parameters=None, # type(Optional[dict]) - task_overrides=None, # type(Optional[dict]) + schedule_task_id=None, # type: Union[str, Task] + schedule_queue=None, # type: str + schedule_function=None, # type: Callable[[str], None] + trigger_project=None, # type: str + trigger_name=None, # type: Optional[str] + trigger_on_publish=None, # type: bool + trigger_on_tags=None, # type: Optional[List[str]] + trigger_on_archive=None, # type: bool + trigger_required_tags=None, # type: Optional[List[str]] + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + add_tag=True, # type: Union[bool, str] + single_instance=False, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): - # type(...) -> bool + # type: (...) -> None """ Create a cron job alike scheduling for a pre existing Task or function. Trigger the Task/function execution on changes in the model repository @@ -270,24 +270,24 @@ class TriggerScheduler(BaseScheduler): def add_dataset_trigger( self, - schedule_task_id=None, # type(Union[str, Task]) - schedule_queue=None, # type(str) - schedule_function=None, # type(Callable) - trigger_project=None, # type(str) - trigger_name=None, # type(Optional[str]) - trigger_on_publish=None, # type(bool) - trigger_on_tags=None, # type(Optional[List[str]]) - trigger_on_archive=None, # type(bool) - trigger_required_tags=None, # type(Optional[List[str]]) - name=None, # type(Optional[str]) - target_project=None, # type(Optional[str]) - add_tag=True, # type(Union[bool, str]) - single_instance=False, # type(bool) - reuse_task=False, # type(bool) - task_parameters=None, # type(Optional[dict]) - task_overrides=None, # type(Optional[dict]) + schedule_task_id=None, # type: Union[str, Task] + schedule_queue=None, # type: str + schedule_function=None, # type: Callable[[str], None] + trigger_project=None, # type: str + trigger_name=None, # type: Optional[str] + trigger_on_publish=None, # type: bool + trigger_on_tags=None, # type: Optional[List[str]] + trigger_on_archive=None, # type: bool + trigger_required_tags=None, # type: Optional[List[str]] + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + add_tag=True, # type: Union[bool, str] + single_instance=False, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): - # type(...) -> bool + # type: (...) -> None """ Create a cron job alike scheduling for a pre existing Task or function. Trigger the Task/function execution on changes in the dataset repository (notice this is not the hyper-datasets) @@ -349,28 +349,28 @@ class TriggerScheduler(BaseScheduler): def add_task_trigger( self, - schedule_task_id=None, # type(Union[str, Task]) - schedule_queue=None, # type(str) - schedule_function=None, # type(Callable) - trigger_project=None, # type(str) - trigger_name=None, # type(Optional[str]) - trigger_on_tags=None, # type(Optional[List[str]]) - trigger_on_status=None, # type(Optional[List[str]]) - trigger_exclude_dev_tasks=None, # type(Optional[bool]) - trigger_on_metric=None, # type(Optional[str]) - trigger_on_variant=None, # type(Optional[str]) - trigger_on_threshold=None, # type(Optional[str]) - trigger_on_sign=None, # type(Optional[str]) - trigger_required_tags=None, # type(Optional[List[str]]) - name=None, # type(Optional[str]) - target_project=None, # type(Optional[str]) - add_tag=True, # type(Union[bool, str]) - single_instance=False, # type(bool) - reuse_task=False, # type(bool) - task_parameters=None, # type(Optional[dict]) - task_overrides=None, # type(Optional[dict]) + schedule_task_id=None, # type: Union[str, Task] + schedule_queue=None, # type: str + schedule_function=None, # type: Callable[[str], None] + trigger_project=None, # type: str + trigger_name=None, # type: Optional[str] + trigger_on_tags=None, # type: Optional[List[str]] + trigger_on_status=None, # type: Optional[List[str]] + trigger_exclude_dev_tasks=None, # type: Optional[bool] + trigger_on_metric=None, # type: Optional[str] + trigger_on_variant=None, # type: Optional[str] + trigger_on_threshold=None, # type: Optional[float] + trigger_on_sign=None, # type: Optional[str] + trigger_required_tags=None, # type: Optional[List[str]] + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + add_tag=True, # type: Union[bool, str] + single_instance=False, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): - # type(...) -> bool + # type: (...) -> None """ Create a cron job alike scheduling for a pre existing Task or function. Trigger the Task/function execution on changes in the Task @@ -570,8 +570,8 @@ class TriggerScheduler(BaseScheduler): @staticmethod def __deserialize_triggers(trigger_jobs, trigger_class, current_triggers): - # type (List[dict], TriggerCLass, List[BaseTrigger]) -> List[BaseTrigger] - trigger_jobs = [trigger_class().update(j) for j in trigger_jobs] + # type: (List[dict], BaseTrigger, List[BaseTrigger]) -> List[BaseTrigger] + trigger_jobs = [trigger_class().update(j) for j in trigger_jobs] # noqa trigger_jobs = {j.name: j for j in trigger_jobs} current_triggers = {j.name: j for j in current_triggers} @@ -617,17 +617,17 @@ class TriggerScheduler(BaseScheduler): state_dict = json.loads(state_json_str) self._dataset_triggers = self.__deserialize_triggers( state_dict.get('dataset_triggers', []), - trigger_class=DatasetTrigger, + trigger_class=DatasetTrigger, # noqa current_triggers=self._dataset_triggers ) self._model_triggers = self.__deserialize_triggers( state_dict.get('model_triggers', []), - trigger_class=ModelTrigger, + trigger_class=ModelTrigger, # noqa current_triggers=self._model_triggers ) self._task_triggers = self.__deserialize_triggers( state_dict.get('task_triggers', []), - trigger_class=TaskTrigger, + trigger_class=TaskTrigger, # noqa current_triggers=self._task_triggers ) diff --git a/examples/scheduler/trigger_example.py b/examples/scheduler/trigger_example.py new file mode 100644 index 00000000..c0c616e6 --- /dev/null +++ b/examples/scheduler/trigger_example.py @@ -0,0 +1,49 @@ +from clearml import Task, Dataset, Model +from clearml.automation import TriggerScheduler + + +def trigger_model_func(model_id): + model = Model(model_id=model_id) + print('model id {} modified'.format(model.id)) + + +def trigger_dataset_func(dataset_id): + dataset = Dataset.get(dataset_id=dataset_id) + print('dataset id {} created'.format(dataset.id)) + + +def trigger_task_func(task_id): + task = Task.get_task(task_id=task_id) + print('Task ID {} metric above threshold'.format(task.id)) + + +if __name__ == '__main__': + trigger = TriggerScheduler(pooling_frequency_minutes=3.0) + trigger.add_model_trigger( + name='model deploy', + schedule_function=trigger_model_func, + trigger_project='examples', + trigger_on_tags=['deploy'] + ) + trigger.add_model_trigger( + name='model quality check', + schedule_task_id='add_task_id_here', + schedule_queue='default', + trigger_project='examples', + trigger_on_tags=['deploy'] + ) + trigger.add_dataset_trigger( + name='retrain on dataset', + schedule_function=trigger_dataset_func, + trigger_project='datasets', + trigger_on_tags=['retrain'] + ) + trigger.add_task_trigger( + name='performance high-score', + schedule_function=trigger_task_func, + trigger_project='examples', + trigger_on_metric='epoch_accuracy', trigger_on_variant='epoch_accuracy', + trigger_on_sign='max', + trigger_on_threshold=0.99 + ) + trigger.start_remotely()