Add Task trigger scheduler

Fix TaskScheduler executed instances
This commit is contained in:
allegroai 2021-08-07 19:39:19 +03:00
parent 7bb78c01bf
commit 8708967a5e
4 changed files with 759 additions and 17 deletions

View File

@ -3,7 +3,7 @@ import logging
from datetime import datetime from datetime import datetime
from threading import Thread, enumerate as enumerate_threads from threading import Thread, enumerate as enumerate_threads
from time import sleep, time from time import sleep, time
from typing import List, Union, Optional, Callable from typing import List, Union, Optional, Callable, Sequence
from attr import attrs, attrib from attr import attrs, attrib
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
@ -24,7 +24,7 @@ class BaseScheduleJob(object):
task_parameters = attrib(type=dict, default={}) task_parameters = attrib(type=dict, default={})
task_overrides = attrib(type=dict, default={}) task_overrides = attrib(type=dict, default={})
clone_task = attrib(type=bool, default=True) clone_task = attrib(type=bool, default=True)
_executed_instances = attrib(type=list, default=[]) _executed_instances = attrib(type=list, default=None)
def to_dict(self, full=False): def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items() return {k: v for k, v in self.__dict__.items()
@ -53,6 +53,9 @@ class BaseScheduleJob(object):
def run(self, task_id): def run(self, task_id):
# type (Optional[str]) -> datetime # type (Optional[str]) -> datetime
if task_id: if task_id:
# make sure we have a new instance
if not self._executed_instances:
self._executed_instances = []
self._executed_instances.append(str(task_id)) self._executed_instances.append(str(task_id))
@ -230,8 +233,8 @@ class ScheduleJob(BaseScheduleJob):
@attrs @attrs
class ExecutedJob(object): class ExecutedJob(object):
name = attrib(type=str) name = attrib(type=str, default=None)
started = attrib(type=datetime, converter=datetime_from_isoformat) started = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
finished = attrib(type=datetime, converter=datetime_from_isoformat, default=None) finished = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
task_id = attrib(type=str, default=None) task_id = attrib(type=str, default=None)
thread_id = attrib(type=str, default=None) thread_id = attrib(type=str, default=None)
@ -241,8 +244,14 @@ class ExecutedJob(object):
class BaseScheduler(object): class BaseScheduler(object):
def __init__(self, sync_frequency_minutes=15, force_create_task_name=None, force_create_task_project=None): def __init__(
# type: (float, Optional[str], Optional[str]) -> None self,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None,
pooling_frequency_minutes=None
):
# type: (float, Optional[str], Optional[str], Optional[float]) -> None
""" """
Create a Task scheduler service Create a Task scheduler service
@ -254,6 +263,7 @@ class BaseScheduler(object):
even if main Task.init already exists. even if main Task.init already exists.
""" """
self._last_sync = 0 self._last_sync = 0
self._pooling_frequency_minutes = pooling_frequency_minutes
self._sync_frequency_minutes = sync_frequency_minutes self._sync_frequency_minutes = sync_frequency_minutes
if force_create_task_name or not Task.current_task(): if force_create_task_name or not Task.current_task():
self._task = Task.init( self._task = Task.init(
@ -297,6 +307,11 @@ class BaseScheduler(object):
# rate control # rate control
sleep(15) sleep(15)
# sleep until the next pool (default None)
if self._pooling_frequency_minutes:
self._log("Sleeping until the next pool in {} minutes".format(self._pooling_frequency_minutes))
sleep(self._pooling_frequency_minutes*60.)
def start_remotely(self, queue='services'): def start_remotely(self, queue='services'):
# type: (str) -> None # type: (str) -> None
""" """
@ -360,8 +375,8 @@ class BaseScheduler(object):
self._launch_job_task(job) self._launch_job_task(job)
self._launch_job_function(job) self._launch_job_function(job)
def _launch_job_task(self, job): def _launch_job_task(self, job, task_parameters=None, add_tags=None):
# type: (BaseScheduleJob) -> Optional[ClearmlJob] # type: (BaseScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob]
# make sure this is not a function job # make sure this is not a function job
if job.base_function: if job.base_function:
return None return None
@ -380,11 +395,12 @@ class BaseScheduler(object):
# actually run the job # actually run the job
task_job = ClearmlJob( task_job = ClearmlJob(
base_task_id=job.base_task_id, base_task_id=job.base_task_id,
parameter_override=job.task_parameters, parameter_override=task_parameters or job.task_parameters,
task_overrides=job.task_overrides, task_overrides=job.task_overrides,
disable_clone_task=not job.clone_task, disable_clone_task=not job.clone_task,
allow_caching=False, allow_caching=False,
target_project=job.target_project, target_project=job.target_project,
tags=[add_tags] if add_tags and isinstance(add_tags, str) else add_tags,
) )
self._log('Scheduling Job {}, Task {} on queue {}.'.format( self._log('Scheduling Job {}, Task {} on queue {}.'.format(
job.name, task_job.task_id(), job.queue)) job.name, task_job.task_id(), job.queue))
@ -393,8 +409,8 @@ class BaseScheduler(object):
job.run(task_job.task_id()) job.run(task_job.task_id())
return task_job return task_job
def _launch_job_function(self, job): def _launch_job_function(self, job, func_args=None):
# type: (ScheduleJob) -> Optional[Thread] # type: (BaseScheduleJob, Optional[Sequence]) -> Optional[Thread]
# make sure this IS a function job # make sure this IS a function job
if not job.base_function: if not job.base_function:
return None return None
@ -419,7 +435,7 @@ class BaseScheduler(object):
self._log("Scheduling Job '{}', Task '{}' on background thread".format( self._log("Scheduling Job '{}', Task '{}' on background thread".format(
job.name, job.base_function)) job.name, job.base_function))
t = Thread(target=job.base_function) t = Thread(target=job.base_function, args=func_args or ())
t.start() t.start()
# mark as run # mark as run
job.run(t.ident) job.run(t.ident)
@ -489,7 +505,8 @@ class TaskScheduler(BaseScheduler):
# type(...) -> bool # type(...) -> bool
""" """
Create a cron job alike scheduling for a pre existing Task. Create a cron job alike scheduling for a pre existing Task.
Notice it is recommended to give the Task a descriptive name, if not provided a random UUID is used. Notice it is recommended to give the schedule entry a descriptive unique name,
if not provided a task ID is used.
Examples: Examples:
Launch every 15 minutes Launch every 15 minutes
@ -763,7 +780,7 @@ class TaskScheduler(BaseScheduler):
# plot the schedule definition # plot the schedule definition
columns = [ columns = [
'name', 'base_task_id', 'next_run', 'target_project', 'queue', 'name', 'base_task_id', 'base_function', 'next_run', 'target_project', 'queue',
'minute', 'hour', 'day', 'month', 'year', 'minute', 'hour', 'day', 'month', 'year',
'starting_time', 'execution_limit_hours', 'recurring', 'starting_time', 'execution_limit_hours', 'recurring',
'single_instance', 'task_parameters', 'task_overrides', 'clone_task', 'single_instance', 'task_parameters', 'task_overrides', 'clone_task',
@ -772,6 +789,14 @@ class TaskScheduler(BaseScheduler):
for j in self._schedule_jobs: for j in self._schedule_jobs:
j_dict = j.to_dict() j_dict = j.to_dict()
j_dict['next_run'] = j.next() j_dict['next_run'] = j.next()
j_dict['base_function'] = "{}.{}".format(
getattr(j.base_function, '__module__', ''),
getattr(j.base_function, '__name__', '')
) if j.base_function else ''
if not j_dict.get('base_task_id'):
j_dict['clone_task'] = ''
row = [ row = [
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '') str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '')
for c in columns for c in columns
@ -802,7 +827,7 @@ class TaskScheduler(BaseScheduler):
[executed_job.name, [executed_job.name,
'<a href="{}">{}</a>'.format(task_link_template.format( '<a href="{}">{}</a>'.format(task_link_template.format(
project='*', task=executed_job.task_id), executed_job.task_id) project='*', task=executed_job.task_id), executed_job.task_id)
if executed_job.task_id else '', if executed_job.task_id else 'function',
str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0] str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0]
] ]
] ]

View File

@ -0,0 +1,716 @@
import json
import logging
from datetime import datetime
from threading import enumerate as enumerate_threads
from typing import List, Optional, Dict
from attr import attrs, attrib
from .job import ClearmlJob
from .scheduler import BaseScheduleJob, BaseScheduler, ExecutedJob
from ..task import Task
from ..backend_api.session.client import APIClient
from ..backend_interface.util import datetime_to_isoformat, datetime_from_isoformat
@attrs
class BaseTrigger(BaseScheduleJob):
_only_fields = {"id", "name", "last_update", }
_update_field = None
project = attrib(default=None, type=str)
match_name = attrib(default=None, type=str)
tags = attrib(default=None, type=list) # type: List[str]
required_tags = attrib(default=None, type=list) # type: List[str]
add_tag = attrib(default=None, type=str)
last_update = attrib(default=None, type=datetime, converter=datetime_from_isoformat)
# remember the previous state of Ids answering the specific query
# any new object.id returned that is not in the list, is a new event
# we store a dict of {object_id: datetime}
# allowing us to ignore repeating object updates triggering multiple times
_triggered_instances = attrib(type=dict, default=None) # type: Dict[str, datetime]
def build_query(self, ref_time):
return {
'name': self.match_name or None,
'project': [self.project] if self.project else None,
'tags': ((self.tags or []) + (self.required_tags or [])) or None,
self._update_field: ">{}".format(ref_time.isoformat() if ref_time else self.last_update.isoformat())
}
def verify(self):
# type () -> None
super(BaseTrigger, self).verify()
if self.tags and (not isinstance(self.tags, (list, tuple)) or
not all(isinstance(s, str) for s in self.tags)):
raise ValueError("Tags must be a list of strings: {}".format(self.tags))
if self.required_tags and (not isinstance(self.required_tags, (list, tuple)) or
not all(isinstance(s, str) for s in self.required_tags)):
raise ValueError("Required tags must be a list of strings: {}".format(self.required_tags))
if self.project and not isinstance(self.project, str):
raise ValueError("Project must be a string: {}".format(self.project))
if self.match_name and not isinstance(self.match_name, str):
raise ValueError("Match name must be a string: {}".format(self.match_name))
def get_key(self):
return getattr(self, '_key', None)
@attrs
class ModelTrigger(BaseTrigger):
_task_param = '${model.id}'
_key = "models"
_only_fields = {"id", "name", "last_update", "ready", "tags"}
_update_field = "last_update"
on_publish = attrib(type=bool, default=None)
on_archive = attrib(type=bool, default=None)
def build_query(self, ref_time):
query = super(ModelTrigger, self).build_query(ref_time)
if self.on_publish:
query.update({'ready': True})
if self.on_archive:
system_tags = list(set(query.get('system_tags', []) + ['archived']))
query.update({'system_tags': system_tags})
return query
@attrs
class DatasetTrigger(BaseTrigger):
_task_param = '${dataset.id}'
_key = "tasks"
_only_fields = {"id", "name", "last_update", "status", "completed", "tags"}
_update_field = "last_update"
on_publish = attrib(type=bool, default=None)
on_archive = attrib(type=bool, default=None)
def build_query(self, ref_time):
query = super(DatasetTrigger, self).build_query(ref_time)
if self.on_publish:
query.update({'status': ['published']})
if self.on_archive:
system_tags = list(set(query.get('system_tags', []) + ['archived']))
query.update({'system_tags': system_tags})
return query
@attrs
class TaskTrigger(BaseTrigger):
_task_param = '${task.id}'
_key = "tasks"
_only_fields = {"id", "name", "last_update", "status", "completed", "tags"}
_update_field = "last_update"
metrics = attrib(default=None, type=str)
variant = attrib(default=None, type=str)
threshold = attrib(default=None, type=float)
value_sign = attrib(default=None, type=str)
exclude_dev = attrib(default=None, type=bool)
on_status = attrib(type=str, default=None)
def build_query(self, ref_time):
query = super(TaskTrigger, self).build_query(ref_time)
if self.exclude_dev:
system_tags = list(set(query.get('system_tags', []) + ['-development']))
query.update({'system_tags': system_tags})
if self.on_status:
query.update({'status': self.on_status})
if self.metrics and self.variant and self.threshold:
metrics, title, series, values = ClearmlJob.get_metric_req_params(self.metrics, self.variant)
sign_max = (self.value_sign or '').lower() in ('max', 'maximum')
filter_key = "last_metrics.{}.{}.{}".format(title, series, "max_value" if sign_max else "min_value")
filter_value = [self.threshold, None] if sign_max else [None, self.threshold]
query.update({filter_key: filter_value})
return query
def verify(self):
# type () -> None
super(TaskTrigger, self).verify()
if (self.metrics or self.variant or self.threshold is not None) and \
not (self.metrics and self.variant and self.threshold is not None):
raise ValueError("You must provide metric/variant/threshold")
valid_status = [str(s) for s in Task.TaskStatusEnum]
if self.on_status and not all(s in valid_status for s in self.on_status):
raise ValueError("You on_status contains invalid status value: {}".format(self.on_status))
valid_signs = ['min', 'minimum', 'max', 'maximum']
if self.value_sign and self.value_sign not in valid_signs:
raise ValueError("Invalid value_sign `{}`, valid options are: {}".format(self.value_sign, valid_signs))
@attrs
class ExecutedTrigger(ExecutedJob):
trigger = attrib(type=str, default=None)
class TriggerScheduler(BaseScheduler):
"""
Trigger Task execution if an event happens in the system
Examples:
New model is published/tagged,
New Dataset is created,
General Task failed,
Task metric below/above threshold, alert every X minutes
"""
_datasets_section = "datasets"
_models_section = "models"
_tasks_section = "tasks"
_state_section = "state"
def __init__(
self,
pooling_frequency_minutes=3.0,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None
):
# type: (float, float, Optional[str], Optional[str]) -> None
"""
Create a Task trigger service
:param pooling_frequency_minutes: Check for new events every X minutes (default 2)
:param sync_frequency_minutes: Sync task scheduler configuration every X minutes.
Allow to change scheduler in runtime by editing the Task configuration object
:param force_create_task_name: Optional, force creation of Task Scheduler service,
even if main Task.init already exists.
:param force_create_task_project: Optional, force creation of Task Scheduler service,
even if main Task.init already exists.
"""
super(TriggerScheduler, self).__init__(
sync_frequency_minutes=sync_frequency_minutes,
force_create_task_name=force_create_task_name,
force_create_task_project=force_create_task_project,
pooling_frequency_minutes=pooling_frequency_minutes,
)
self._task_triggers = []
self._dataset_triggers = []
self._model_triggers = []
self._executed_triggers = []
self._client = None
def add_model_trigger(
self,
schedule_task_id=None, # type(Union[str, Task])
schedule_queue=None, # type(str)
schedule_function=None, # type(Callable)
trigger_project=None, # type(str)
trigger_name=None, # type(Optional[str])
trigger_on_publish=None, # type(bool)
trigger_on_tags=None, # type(Optional[List[str]])
trigger_on_archive=None, # type(bool)
trigger_required_tags=None, # type(Optional[List[str]])
name=None, # type(Optional[str])
target_project=None, # type(Optional[str])
add_tag=True, # type(Union[bool, str])
single_instance=False, # type(bool)
reuse_task=False, # type(bool)
task_parameters=None, # type(Optional[dict])
task_overrides=None, # type(Optional[dict])
):
# type(...) -> bool
"""
Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the model repository
Notice it is recommended to give the trigger a descriptive unique name, if not provided a task ID is used.
Notice `task_overrides` can except reference to the trigger model ID:
example: task_overrides={'Args/model_id': '${model.id}'}
Notice if schedule_function is passed, use the following function interface:
```py
def schedule_function(model_id):
pass
```
:param schedule_task_id: Task/task ID to be cloned and scheduled for execution
:param schedule_queue: Queue name or ID to put the Task into (i.e. schedule)
:param schedule_function: Optional, instead of providing Task ID to be scheduled,
provide a function to be called. Notice the function is called from the scheduler context
(i.e. running on the same machine as the scheduler)
:param name: Name or description for the cron Task (should be unique if provided otherwise randomly generated)
:param trigger_project: Only monitor models from this specific project (not recursive)
:param trigger_name: Trigger only on models with name matching (regexp)
:param trigger_on_publish: Trigger when model is published.
:param trigger_on_tags: Trigger when all tags in the list are present
:param trigger_on_archive: Trigger when model is archived
:param trigger_required_tags: Trigger only on models with the following additional tags (must include all tags)
:param target_project: Specify target project to put the cloned scheduled Task in.
:param add_tag: Add tag to the executed Task. Provide specific tag (str) or
pass True (default) to use the trigger name as tag
:param single_instance: If True, do not launch the Task job if the previous instance is still running
(skip until the next scheduled time period). Default False.
:param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False.
:param task_parameters: Configuration parameters to the executed Task.
for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/
:param task_overrides: Change task definition.
for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True
:return: True if job is successfully added to the scheduling list
"""
trigger = ModelTrigger(
base_task_id=schedule_task_id,
base_function=schedule_function,
queue=schedule_queue,
name=name,
target_project=target_project,
single_instance=single_instance,
task_parameters=task_parameters,
task_overrides=task_overrides,
add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None,
clone_task=not bool(reuse_task),
match_name=trigger_name,
project=Task.get_project_id(trigger_project) if trigger_project else None,
tags=trigger_on_tags,
required_tags=trigger_required_tags,
on_publish=trigger_on_publish,
on_archive=trigger_on_archive,
)
trigger.verify()
self._model_triggers.append(trigger)
def add_dataset_trigger(
self,
schedule_task_id=None, # type(Union[str, Task])
schedule_queue=None, # type(str)
schedule_function=None, # type(Callable)
trigger_project=None, # type(str)
trigger_name=None, # type(Optional[str])
trigger_on_publish=None, # type(bool)
trigger_on_tags=None, # type(Optional[List[str]])
trigger_on_archive=None, # type(bool)
trigger_required_tags=None, # type(Optional[List[str]])
name=None, # type(Optional[str])
target_project=None, # type(Optional[str])
add_tag=True, # type(Union[bool, str])
single_instance=False, # type(bool)
reuse_task=False, # type(bool)
task_parameters=None, # type(Optional[dict])
task_overrides=None, # type(Optional[dict])
):
# type(...) -> bool
"""
Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the dataset repository (notice this is not the hyper-datasets)
Notice it is recommended to give the trigger a descriptive unique name, if not provided a task ID is used.
Notice `task_overrides` can except reference to the trigger model ID:
example: task_overrides={'Args/dataset_id': '${dataset.id}'}
Notice if schedule_function is passed, use the following function interface:
```py
def schedule_function(dataset_id):
pass
```
:param schedule_task_id: Task/task ID to be cloned and scheduled for execution
:param schedule_queue: Queue name or ID to put the Task into (i.e. schedule)
:param schedule_function: Optional, instead of providing Task ID to be scheduled,
provide a function to be called. Notice the function is called from the scheduler context
(i.e. running on the same machine as the scheduler)
:param name: Name or description for the cron Task (should be unique if provided otherwise randomly generated)
:param trigger_project: Only monitor datasets from this specific project (not recursive)
:param trigger_name: Trigger only on datasets with name matching (regexp)
:param trigger_on_publish: Trigger when dataset is published.
:param trigger_on_tags: Trigger when all tags in the list are present
:param trigger_on_archive: Trigger when dataset is archived
:param trigger_required_tags: Trigger only on datasets with the
following additional tags (must include all tags)
:param target_project: Specify target project to put the cloned scheduled Task in.
:param add_tag: Add tag to the executed Task. Provide specific tag (str) or
pass True (default) to use the trigger name as tag
:param single_instance: If True, do not launch the Task job if the previous instance is still running
(skip until the next scheduled time period). Default False.
:param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False.
:param task_parameters: Configuration parameters to the executed Task.
for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/
:param task_overrides: Change task definition.
for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True
:return: True if job is successfully added to the scheduling list
"""
trigger = DatasetTrigger(
base_task_id=schedule_task_id,
base_function=schedule_function,
queue=schedule_queue,
name=name,
target_project=target_project,
single_instance=single_instance,
task_parameters=task_parameters,
task_overrides=task_overrides,
add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None,
clone_task=not bool(reuse_task),
match_name=trigger_name,
project=Task.get_project_id(trigger_project) if trigger_project else None,
tags=trigger_on_tags,
required_tags=trigger_required_tags,
on_publish=trigger_on_publish,
on_archive=trigger_on_archive,
)
trigger.verify()
self._dataset_triggers.append(trigger)
def add_task_trigger(
self,
schedule_task_id=None, # type(Union[str, Task])
schedule_queue=None, # type(str)
schedule_function=None, # type(Callable)
trigger_project=None, # type(str)
trigger_name=None, # type(Optional[str])
trigger_on_tags=None, # type(Optional[List[str]])
trigger_on_status=None, # type(Optional[List[str]])
trigger_exclude_dev_tasks=None, # type(Optional[bool])
trigger_on_metric=None, # type(Optional[str])
trigger_on_variant=None, # type(Optional[str])
trigger_on_threshold=None, # type(Optional[str])
trigger_on_sign=None, # type(Optional[str])
trigger_required_tags=None, # type(Optional[List[str]])
name=None, # type(Optional[str])
target_project=None, # type(Optional[str])
add_tag=True, # type(Union[bool, str])
single_instance=False, # type(bool)
reuse_task=False, # type(bool)
task_parameters=None, # type(Optional[dict])
task_overrides=None, # type(Optional[dict])
):
# type(...) -> bool
"""
Create a cron job alike scheduling for a pre existing Task or function.
Trigger the Task/function execution on changes in the Task
Notice it is recommended to give the trigger a descriptive unique name, if not provided a task ID is used.
Notice `task_overrides` can except reference to the trigger model ID:
example: task_overrides={'Args/task_id': '${task.id}'}
Notice if schedule_function is passed, use the following function interface:
```py
def schedule_function(task_id):
pass
```
:param schedule_task_id: Task/task ID to be cloned and scheduled for execution
:param schedule_queue: Queue name or ID to put the Task into (i.e. schedule)
:param schedule_function: Optional, instead of providing Task ID to be scheduled,
provide a function to be called. Notice the function is called from the scheduler context
(i.e. running on the same machine as the scheduler)
:param name: Name or description for the cron Task (should be unique if provided otherwise randomly generated)
:param trigger_project: Only monitor tasks from this specific project (not recursive)
:param trigger_name: Trigger only on tasks with name matching (regexp)
:param trigger_on_tags: Trigger when all tags in the list are present
:param trigger_required_tags: Trigger only on tasks with the following additional tags (must include all tags)
:param trigger_on_status: Trigger on Task status change.
expect list of status strings, e.g. ['failed', 'published']
:param trigger_exclude_dev_tasks: If True only trigger on Tasks executed by clearml-agent (and not manually)
:param trigger_on_metric: Trigger on metric/variant above/under threshold (metric=title, variant=series)
:param trigger_on_variant: Trigger on metric/variant above/under threshold (metric=title, variant=series)
:param trigger_on_threshold: Trigger on metric/variant above/under threshold (float number)
:param trigger_on_sign: possible values "max"/"maximum" or "min"/"minimum",
trigger Task if metric below "min" or "above" maximum. Default: "minimum"
:param target_project: Specify target project to put the cloned scheduled Task in.
:param add_tag: Add tag to the executed Task. Provide specific tag (str) or
pass True (default) to use the trigger name as tag
:param single_instance: If True, do not launch the Task job if the previous instance is still running
(skip until the next scheduled time period). Default False.
:param reuse_task: If True, re-enqueue the same Task (i.e. do not clone it) every time, default False.
:param task_parameters: Configuration parameters to the executed Task.
for example: {'Args/batch': '12'} Notice: not available when reuse_task=True/
:param task_overrides: Change task definition.
for example {'script.version_num': None, 'script.branch': 'main'} Notice: not available when reuse_task=True
:return: True if job is successfully added to the scheduling list
"""
trigger = TaskTrigger(
base_task_id=schedule_task_id,
base_function=schedule_function,
queue=schedule_queue,
name=name,
target_project=target_project,
single_instance=single_instance,
task_parameters=task_parameters,
task_overrides=task_overrides,
add_tag=(add_tag if isinstance(add_tag, str) else (name or schedule_task_id)) if add_tag else None,
clone_task=not bool(reuse_task),
match_name=trigger_name,
project=Task.get_project_id(trigger_project) if trigger_project else None,
tags=trigger_on_tags,
required_tags=trigger_required_tags,
on_status=trigger_on_status,
exclude_dev=trigger_exclude_dev_tasks,
metrics=trigger_on_metric,
variant=trigger_on_variant,
threshold=trigger_on_threshold,
value_sign=trigger_on_sign
)
trigger.verify()
self._task_triggers.append(trigger)
def start(self):
"""
Start the Task trigger loop (notice this function does not return)
"""
super(TriggerScheduler, self).start()
def get_triggers(self):
# type: () -> List[BaseTrigger]
"""
Return all triggers (models, datasets, tasks)
:return: List of trigger objects
"""
return self._model_triggers + self._dataset_triggers + self._task_triggers
def _step(self):
if not self._client:
self._client = APIClient()
executed = False
for trigger in (self._model_triggers + self._dataset_triggers + self._task_triggers):
ref_time = datetime_from_isoformat(trigger.last_update or datetime.utcnow())
objects = []
try:
# noinspection PyProtectedMember
objects = getattr(self._client, trigger.get_key()).get_all(
_allow_extra_fields_=True,
only_fields=list(trigger._only_fields or []),
**trigger.build_query(ref_time)
)
trigger.last_update = max([o.last_update for o in objects] or [ref_time])
if not objects:
continue
except Exception as ex:
self._log("Exception occurred while checking trigger '{}' state: {}".format(trigger, ex))
executed |= bool(objects)
# actually handle trigger
for obj in objects:
# create a unique instance list
if not trigger._triggered_instances:
trigger._triggered_instances = {}
if obj.id in trigger._triggered_instances:
continue
trigger._triggered_instances[obj.id] = datetime.utcnow()
self._launch_job(trigger, obj.id)
return executed
# noinspection PyMethodOverriding
def _launch_job(self, job, trigger_id):
# type: (BaseTrigger, str) -> None
if job.base_task_id:
task_parameters = None
if job.task_parameters:
task_parameters = {
k: trigger_id if v == job._task_param else v # noqa
for k, v in job.task_parameters.items()
}
task_job = self._launch_job_task(
job,
task_parameters=task_parameters,
add_tags=job.add_tag or None,
)
if task_job:
self._executed_triggers.append(
ExecutedTrigger(
name=job.name,
task_id=task_job.task_id(),
started=datetime.utcnow(),
trigger=str(job.__class__.__name__))
)
if job.base_function:
thread_job = self._launch_job_function(job, func_args=(trigger_id, ))
if thread_job:
self._executed_triggers.append(
ExecutedTrigger(
name=job.name,
thread_id=str(thread_job.ident),
started=datetime.utcnow(),
trigger=str(job.__class__.__name__))
)
def _serialize(self):
# noinspection PyProtectedMember
self._task._set_configuration(
config_type='json',
description='Dataset trigger configuration',
config_text=json.dumps(
[j.to_dict() for j in self._dataset_triggers], default=datetime_to_isoformat),
name=self._datasets_section,
)
# noinspection PyProtectedMember
self._task._set_configuration(
config_type='json',
description='Model trigger configuration',
config_text=json.dumps(
[j.to_dict() for j in self._model_triggers], default=datetime_to_isoformat),
name=self._models_section,
)
# noinspection PyProtectedMember
self._task._set_configuration(
config_type='json',
description='Task trigger configuration',
config_text=json.dumps(
[j.to_dict() for j in self._task_triggers], default=datetime_to_isoformat),
name=self._tasks_section,
)
def _deserialize(self):
self._task.reload()
self._dataset_triggers = self.__deserialize_section(
section=self._datasets_section, trigger_class=DatasetTrigger, current_triggers=self._dataset_triggers)
self._model_triggers = self.__deserialize_section(
section=self._models_section, trigger_class=ModelTrigger, current_triggers=self._model_triggers)
self._task_triggers = self.__deserialize_section(
section=self._tasks_section, trigger_class=TaskTrigger, current_triggers=self._task_triggers)
def __deserialize_section(self, section, trigger_class, current_triggers):
# noinspection PyProtectedMember
json_str = self._task._get_configuration_text(name=section)
try:
return self.__deserialize_triggers(json.loads(json_str), trigger_class, current_triggers)
except Exception as ex:
self._log('Failed deserializing configuration: {}'.format(ex), level=logging.WARN)
return current_triggers
@staticmethod
def __deserialize_triggers(trigger_jobs, trigger_class, current_triggers):
# type (List[dict], TriggerCLass, List[BaseTrigger]) -> List[BaseTrigger]
trigger_jobs = [trigger_class().update(j) for j in trigger_jobs]
trigger_jobs = {j.name: j for j in trigger_jobs}
current_triggers = {j.name: j for j in current_triggers}
# select only valid jobs, and update the valid ones state from the current one
new_triggers = [
current_triggers[name].update(j) if name in current_triggers else j
for name, j in trigger_jobs.items()
]
# verify all jobs
for j in new_triggers:
j.verify()
return new_triggers
def _serialize_state(self):
json_str = json.dumps(
dict(
dataset_triggers=[j.to_dict(full=True) for j in self._dataset_triggers],
model_triggers=[j.to_dict(full=True) for j in self._model_triggers],
task_triggers=[j.to_dict(full=True) for j in self._task_triggers],
# pooling_frequency_minutes=self._pooling_frequency_minutes,
# sync_frequency_minutes=self._sync_frequency_minutes,
),
default=datetime_to_isoformat
)
self._task.upload_artifact(
name=self._state_section,
artifact_object=json_str,
preview='scheduler internal state'
)
def _deserialize_state(self):
# get artifact
self._task.reload()
artifact_object = self._task.artifacts.get(self._state_section)
if artifact_object is None:
return
state_json_str = artifact_object.get()
if state_json_str is None:
return
state_dict = json.loads(state_json_str)
self._dataset_triggers = self.__deserialize_triggers(
state_dict.get('dataset_triggers', []),
trigger_class=DatasetTrigger,
current_triggers=self._dataset_triggers
)
self._model_triggers = self.__deserialize_triggers(
state_dict.get('model_triggers', []),
trigger_class=ModelTrigger,
current_triggers=self._model_triggers
)
self._task_triggers = self.__deserialize_triggers(
state_dict.get('task_triggers', []),
trigger_class=TaskTrigger,
current_triggers=self._task_triggers
)
def _update_execution_plots(self):
# type: () -> None
if not self._task:
return
task_link_template = self._task.get_output_log_web_page() \
.replace('/{}/'.format(self._task.project), '/{project}/') \
.replace('/{}/'.format(self._task.id), '/{task}/')
# plot the already executed Tasks
executed_table = [['trigger', 'name', 'task id', 'started', 'finished']]
for executed_job in sorted(self._executed_triggers, key=lambda x: x.started, reverse=True):
if not executed_job.finished:
if executed_job.task_id:
t = Task.get_task(task_id=executed_job.task_id)
if t.status not in ('in_progress', 'queued'):
executed_job.finished = t.data.completed or datetime.utcnow()
elif executed_job.thread_id:
# noinspection PyBroadException
try:
a_thread = [t for t in enumerate_threads() if t.ident == executed_job.thread_id]
if not a_thread or not a_thread[0].is_alive():
executed_job.finished = datetime.utcnow()
except Exception:
pass
executed_table += [
[executed_job.trigger,
executed_job.name,
'<a href="{}">{}</a>'.format(task_link_template.format(
project='*', task=executed_job.task_id), executed_job.task_id)
if executed_job.task_id else 'function',
str(executed_job.started).split('.', 1)[0],
str(executed_job.finished).split('.', 1)[0]
]
]
# plot the schedule definition
self._task.get_logger().report_table(
title='Triggers Executed', series=' ', iteration=0,
table_plot=executed_table
)
self.__report_trigger_table(triggers=self._model_triggers, title='Model Triggers')
self.__report_trigger_table(triggers=self._dataset_triggers, title='Dataset Triggers')
self.__report_trigger_table(triggers=self._task_triggers, title='Task Triggers')
def __report_trigger_table(self, triggers, title):
if not triggers:
return
task_link_template = self._task.get_output_log_web_page() \
.replace('/{}/'.format(self._task.project), '/{project}/') \
.replace('/{}/'.format(self._task.id), '/{task}/')
columns = [k for k in BaseTrigger().__dict__.keys() if not k.startswith('_')]
columns += [k for k in triggers[0].__dict__.keys() if k not in columns and not k.startswith('_')]
column_task_id = columns.index('base_task_id')
scheduler_table = [columns]
for j in triggers:
j_dict = j.to_dict()
j_dict['base_function'] = "{}.{}".format(
getattr(j.base_function, '__module__', ''),
getattr(j.base_function, '__name__', '')
) if j.base_function else ''
if not j_dict.get('base_task_id'):
j_dict['clone_task'] = ''
row = [
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '')
for c in columns
]
if row[column_task_id]:
row[column_task_id] = '<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=row[column_task_id]), row[column_task_id])
scheduler_table += [row]
self._task.get_logger().report_table(
title=title, series=' ', iteration=0,
table_plot=scheduler_table
)

View File

@ -160,4 +160,4 @@ def datetime_from_isoformat(o):
return None return None
if isinstance(o, datetime): if isinstance(o, datetime):
return o return o
return datetime.strptime(o, "%Y-%m-%dT%H:%M:%S.%f") return datetime.strptime(o.split('+')[0], "%Y-%m-%dT%H:%M:%S.%f")

View File

@ -403,7 +403,8 @@ class Task(_Task):
raise UsageError( raise UsageError(
"Current task already created " "Current task already created "
"and requested {field} '{default}' does not match current {field} '{current}'. " "and requested {field} '{default}' does not match current {field} '{current}'. "
"If you wish to create additional tasks use `Task.create`".format( "If you wish to create additional tasks use `Task.create`, "
"or close the current task with `task.close()` before calling `Task.init(...)`".format(
field=field, field=field,
default=default, default=default,
current=current, current=current,