Task scheduler support combining weekdays and recurring scheduled jobs

This commit is contained in:
allegroai 2021-08-01 00:10:08 +03:00
parent 774957797e
commit 071caf5302

View File

@ -1,22 +1,24 @@
import json
from datetime import datetime
from threading import Thread, enumerate as enumerate_threads
from time import sleep, time
from typing import List, Union, Optional
from typing import List, Union, Optional, Callable
from attr import attrs, attrib
from dateutil.relativedelta import relativedelta
from .job import ClearmlJob
from ..backend_interface.util import datetime_from_isoformat, datetime_to_isoformat
from ..backend_interface.util import datetime_from_isoformat, datetime_to_isoformat, mutually_exclusive
from ..task import Task
@attrs
class ScheduleJob(object):
_weekdays = ('sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday')
_weekdays_ind = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday')
name = attrib(type=str)
base_task_id = attrib(type=str)
base_function = attrib(type=Callable, default=None)
queue = attrib(type=str, default=None)
target_project = attrib(type=str, default=None)
execution_limit_hours = attrib(type=float, default=None)
@ -29,6 +31,7 @@ class ScheduleJob(object):
minute = attrib(type=float, default=None)
hour = attrib(type=float, default=None)
day = attrib(default=None)
weekdays = attrib(default=None)
month = attrib(type=float, default=None)
year = attrib(type=float, default=None)
_executed_instances = attrib(type=list, default=[])
@ -37,13 +40,28 @@ class ScheduleJob(object):
_execution_timeout = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith('_')}
return {k: v for k, v in self.__dict__.items()
if not callable(v) and (full or not str(k).startswith('_'))}
def update(self, a_job):
for k, v in a_job.to_dict().items():
setattr(self, k, v)
if not callable(getattr(self, k, v)):
setattr(self, k, v)
return self
def verify(self):
# type () -> None
if self.weekdays and self.day not in (None, 0, 1):
raise ValueError("`weekdays` and `day` combination is not valid (day must be None,0 or 1)")
if self.base_function and not self.name:
raise ValueError("Entry 'name' must be supplied for function scheduling")
if self.base_task_id and not self.queue:
raise ValueError("Target 'queue' must be provided for function scheduling")
if not (self.minute or self.hour or self.day or self.month or self.year):
raise ValueError("Schedule time/date was not provided")
if not self.base_function and not self.base_task_id:
raise ValueError("Either schedule function or task-id must be provided")
def next_run(self):
# type () -> Optional[datetime]
return self._next_run
@ -82,11 +100,17 @@ class ScheduleJob(object):
hour=int(self.hour or 0),
minute=int(self.minute or 0)
)
if self.weekdays:
self._next_run += relativedelta(weekday=self.get_weekday_ord(self.weekdays[0]))
return self._next_run
weekday = self.day if isinstance(self.day, str) and self.day.lower() in self._weekdays else None
if weekday:
weekday = self._weekdays.index(weekday)
weekday = None
if self.weekdays:
# get previous weekday
_weekdays = [self.get_weekday_ord(w) for w in self.weekdays]
prev_weekday_ind = _weekdays.index(self._last_executed.weekday()) if self._last_executed else -1
weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)]
# check if we have a specific day of the week
prev_timestamp = self._last_executed or self.starting_time
@ -95,19 +119,67 @@ class ScheduleJob(object):
prev_timestamp = datetime(
year=prev_timestamp.year,
month=self.month or prev_timestamp.month,
day=self.day or prev_timestamp.day,
day=self.day or 1,
)
elif self.month:
prev_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=self.day or prev_timestamp.day,
day=self.day or 1,
)
elif self.day or weekday is not None:
elif self.day is None and weekday is not None:
# notice we assume every X hours on specific weekdays
# other combinations (i.e. specific time at weekdays, is covered later)
next_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=prev_timestamp.day,
hour=prev_timestamp.hour,
minute=prev_timestamp.minute,
)
next_timestamp += relativedelta(
years=self.year or 0,
months=0 if self.year else (self.month or 0),
hours=self.hour or 0,
minutes=self.minute or 0,
weekday=weekday if not self._last_executed else None,
)
# start a new day
if next_timestamp.day != prev_timestamp.day:
next_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=prev_timestamp.day,
) + relativedelta(
years=self.year or 0,
months=0 if self.year else (self.month or 0),
hours=self.hour or 0,
minutes=self.minute or 0,
weekday=weekday
)
self._next_run = next_timestamp
return self._next_run
elif self.day is not None and weekday is not None:
# push to the next day (so we only have once a day)
prev_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=prev_timestamp.day if weekday is None else 1,
day=prev_timestamp.day,
) + relativedelta(days=1)
elif self.day:
# reset minutes in the hour (we will be adding additional hour anyhow
prev_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=prev_timestamp.day,
)
elif self.hour:
# reset minutes in the hour (we will be adding additional hour anyhow
prev_timestamp = datetime(
year=prev_timestamp.year,
month=prev_timestamp.month,
day=prev_timestamp.day,
hour=prev_timestamp.hour,
)
self._next_run = prev_timestamp + relativedelta(
@ -134,13 +206,21 @@ class ScheduleJob(object):
self._execution_timeout = None
return self._last_executed
@classmethod
def get_weekday_ord(cls, weekday):
# type: (Union[int, str]) -> int
if isinstance(weekday, int):
return min(6, max(weekday, 0))
return cls._weekdays_ind.index(weekday)
@attrs
class ExecutedJob(object):
name = attrib(type=str)
task_id = attrib(type=str)
started = attrib(type=datetime, converter=datetime_from_isoformat)
finished = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
task_id = attrib(type=str, default=None)
thread_id = attrib(type=str, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith('_')}
@ -168,7 +248,7 @@ class TaskScheduler(object):
self._last_sync = 0
self._sync_frequency_minutes = sync_frequency_minutes
self._schedule_jobs = [] # List[ScheduleJob]
self._timeout_jobs = {} # Dict[str, datetime]
self._timeout_jobs = {} # Dict[datetime, str]
self._executed_jobs = [] # List[ExecutedJob]
self._thread = None
if force_create_task_name or not Task.current_task():
@ -183,15 +263,17 @@ class TaskScheduler(object):
def add_task(
self,
task_id, # type(Union[str, Task])
queue, # type(str)
schedule_task_id=None, # type(Union[str, Task])
schedule_function=None, # type(Callable)
queue=None, # type(str)
name=None, # type(Optional[str])
target_project=None, # type(Optional[str])
minute=None, # type(Optional[float])
hour=None, # type(Optional[float])
day=None, # type(Optional[Union[float, str]])
month=None, # type(Optional[float])
year=None, # type(Optional[float])
minute=None, # type(Optional[int])
hour=None, # type(Optional[int])
day=None, # type(Optional[int])
weekdays=None, # type(Optional[List[str]])
month=None, # type(Optional[int])
year=None, # type(Optional[int])
limit_execution_time=None, # type(Optional[float])
single_instance=False, # type(bool)
recurring=True, # type(bool)
@ -203,32 +285,39 @@ class TaskScheduler(object):
"""
Create a cron job alike scheduling for a pre existing Task.
Notice it is recommended to give the Task a descriptive name, if not provided a random UUID is used.
Examples:
Launch every 15 minutes
add_task(task_id='1235', queue='default', minute=15)
add_task(task_id='1235', queue='default', minute=15)
Launch every 1 hour
add_task(task_id='1235', queue='default', hour=1)
Launch every 1 hour and a half
add_task(task_id='1235', queue='default', hour=1.5)
add_task(task_id='1235', queue='default', hour=1)
Launch every 1 hour and at hour:30 minutes (i.e. 1:30, 2:30 etc.)
add_task(task_id='1235', queue='default', hour=1, minute=30)
Launch every day at 22:30 (10:30 pm)
add_task(task_id='1235', queue='default', minute=30, hour=22, day=1)
add_task(task_id='1235', queue='default', minute=30, hour=22, day=1)
Launch every other day at 7:30 (7:30 am)
add_task(task_id='1235', queue='default', minute=30, hour=7, day=2)
Launch every Saturday at 8:30am
add_task(task_id='1235', queue='default', minute=30, hour=8, day='saturday')
add_task(task_id='1235', queue='default', minute=30, hour=7, day=2)
Launch every Saturday at 8:30am (notice `day=0`)
add_task(task_id='1235', queue='default', minute=30, hour=8, day=0, weekdays=['saturday'])
Launch every 2 hours on the weekends Saturday/Sunday (notice `day` is not passed)
add_task(task_id='1235', queue='default', hour=2, weekdays=['saturday', 'sunday'])
Launch once a month at the 5th of each month
add_task(task_id='1235', queue='default', month=1, day=5)
Launch once a year on March 4th of each month
add_task(task_id='1235', queue='default', year=1, month=3, day=4)
add_task(task_id='1235', queue='default', month=1, day=5)
Launch once a year on March 4th of each year
add_task(task_id='1235', queue='default', year=1, month=3, day=4)
:param task_id: Task/task ID to be cloned and scheduled for execution
:param schedule_task_id: Task/task ID to be cloned and scheduled for execution
:param schedule_function: Optional, instead of providing Task ID to be scheduled,
provide a function to be called. Notice the function is called from the scheduler context
(i.e. running on the same machine as the scheduler)
:param queue: Queue name or ID to put the Task into (i.e. schedule)
:param name: Name or description for the cron Task (should be unique if provided otherwise randomly generated)
:param target_project: Specify target project to put the cloned scheduled Task in.
:param minute: If specified launch Task at a specific minute of the day
:param hour: If specified launch Task at a specific hour (24h) of the day
:param day: If specified launch Task at a specific day
:param month: If specified launch Task at a specific month
:param minute: If specified launch Task at a specific minute of the day (Valid values 0-60)
:param hour: If specified launch Task at a specific hour (24h) of the day (Valid values 0-24)
:param day: If specified launch Task at a specific day (Valid values 1-31)
:param weekdays: If specified a list of week days to schedule the Task in (assuming day, not given)
:param month: If specified launch Task at a specific month (Valid values 1-12)
:param year: If specified launch Task at a specific year
:param limit_execution_time: Limit the execution time (in hours) of the specific job.
:param single_instance: If True, do not launch the Task job if the previous instance is still running
@ -242,11 +331,14 @@ class TaskScheduler(object):
:return: True if job is successfully added to the scheduling list
"""
task_id = task_id.id if isinstance(task_id, Task) else str(task_id)
mutually_exclusive(schedule_function=schedule_function, schedule_task_id=schedule_task_id)
task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or '')
# noinspection PyProtectedMember
job = ScheduleJob(
name=name or task_id,
base_task_id=task_id,
base_function=schedule_function,
queue=queue,
target_project=target_project,
execution_limit_hours=limit_execution_time,
@ -259,13 +351,12 @@ class TaskScheduler(object):
minute=minute,
hour=hour,
day=day,
weekdays=weekdays,
month=month,
year=year,
)
# make sure the queue is valid
if not job.queue:
self._log('Queue [], could not be found, skipping scheduled task'.format(queue), level='warning')
return False
# raise exception if not valid
job.verify()
self._schedule_jobs.append(job)
return True
@ -280,17 +371,22 @@ class TaskScheduler(object):
return self._schedule_jobs
def remove_task(self, task_id):
# type: (Union[str, Task]) -> bool
# type: (Union[str, Task, Callable]) -> bool
"""
Remove a Task ID from the scheduled task list.
:param task_id: Task or Task ID to be removed
:return: return True of the Task ID was found in the scheduled jobs list and was removed.
"""
task_id = task_id.id if isinstance(task_id, Task) else str(task_id)
if not any(t.base_task_id == task_id for t in self._schedule_jobs):
return False
self._schedule_jobs = [t.base_task_id != task_id for t in self._schedule_jobs]
if isinstance(task_id, (Task, str)):
task_id = task_id.id if isinstance(task_id, Task) else str(task_id)
if not any(t.base_task_id == task_id for t in self._schedule_jobs):
return False
self._schedule_jobs = [t for t in self._schedule_jobs if t.base_task_id != task_id]
else:
if not any(t.base_function == task_id for t in self._schedule_jobs):
return False
self._schedule_jobs = [t for t in self._schedule_jobs if t.base_function != task_id]
return True
def start(self):
@ -356,43 +452,14 @@ class TaskScheduler(object):
# check if this is a Task timeout check
if timeout_jobs and next_time_stamp == timeout_jobs[0]:
self._log('Aborting timeout job: {}'.format(timeout_jobs[0]))
# mark aborted
task_id = [k for k, v in self._timeout_jobs.items() if v == timeout_jobs[0]][0]
self._cancel_task(task_id=task_id)
self._timeout_jobs.pop(task_id, None)
else:
job = scheduled_jobs[0]
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
t = Task.get_task(task_id=job.get_last_executed_task_id())
if t.status in ('in_progress', 'queued'):
self._log(
'Skipping Task {} scheduling, previous Task instance {} still running'.format(
job.name, t.id
))
job.run(None)
return
# actually run the job
task_job = ClearmlJob(
base_task_id=job.base_task_id,
parameter_override=job.task_parameters,
task_overrides=job.task_overrides,
disable_clone_task=not job.clone_task,
allow_caching=False,
target_project=job.target_project,
)
self._log('Scheduling Job {}, Task {} on queue {}.'.format(
job.name, task_job.task_id(), job.queue))
if task_job.launch(queue_name=job.queue):
# mark as run
job.run(task_job.task_id())
self._executed_jobs.append(ExecutedJob(
name=job.name, task_id=task_job.task_id(), started=datetime.utcnow()))
# add timeout check
if job.get_execution_timeout():
# we should probably make sure we are not overwriting a Task
self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id()
self._log('Launching job: {}'.format(scheduled_jobs[0]))
self._launch_job(scheduled_jobs[0])
self._update_execution_plots()
@ -477,6 +544,9 @@ class TaskScheduler(object):
current_scheduled_jobs[name].update(j) if name in current_scheduled_jobs else j
for name, j in scheduled_jobs.items()
]
# verify all jobs
for j in self._schedule_jobs:
j.verify()
def _serialize_schedule_into_string(self):
# type: () -> str
@ -496,7 +566,7 @@ class TaskScheduler(object):
# plot the schedule definition
columns = [
'name', 'base_task_id', 'target_project', 'queue',
'name', 'base_task_id', 'next_run', 'target_project', 'queue',
'minute', 'hour', 'day', 'month', 'year',
'starting_time', 'execution_limit_hours', 'recurring',
'single_instance', 'task_parameters', 'task_overrides', 'clone_task',
@ -504,25 +574,38 @@ class TaskScheduler(object):
scheduler_table = [columns]
for j in self._schedule_jobs:
j_dict = j.to_dict()
j_dict['next_run'] = j.next()
row = [
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c))
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '')
for c in columns
]
row[1] = '<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=row[1]), row[1])
if row[1]:
row[1] = '<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=row[1]), row[1])
scheduler_table += [row]
# plot the already executed Tasks
executed_table = [['name', 'task id', 'started', 'finished']]
for executed_job in sorted(self._executed_jobs, key=lambda x: x.started, reverse=True):
if not executed_job.finished:
t = Task.get_task(task_id=executed_job.task_id)
if t.status not in ('in_progress', 'queued'):
executed_job.finished = t.data.completed or datetime.utcnow()
if executed_job.task_id:
t = Task.get_task(task_id=executed_job.task_id)
if t.status not in ('in_progress', 'queued'):
executed_job.finished = t.data.completed or datetime.utcnow()
elif executed_job.thread_id:
# noinspection PyBroadException
try:
a_thread = [t for t in enumerate_threads() if t.ident == executed_job.thread_id]
if not a_thread or not a_thread[0].is_alive():
executed_job.finished = datetime.utcnow()
except Exception:
pass
executed_table += [
[executed_job.name,
'<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=executed_job.task_id), executed_job.task_id),
'<a href="{}">{}</a>'.format(task_link_template.format(
project='*', task=executed_job.task_id), executed_job.task_id)
if executed_job.task_id else '',
str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0]
]
]
@ -542,6 +625,83 @@ class TaskScheduler(object):
else:
print(message)
def _launch_job(self, job):
# type: (ScheduleJob) -> None
self._launch_job_task(job)
self._launch_job_function(job)
def _launch_job_task(self, job):
# type: (ScheduleJob) -> None
# make sure this is not a function job
if job.base_function:
return
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
t = Task.get_task(task_id=job.get_last_executed_task_id())
if t.status in ('in_progress', 'queued'):
self._log(
'Skipping Task {} scheduling, previous Task instance {} still running'.format(
job.name, t.id
))
job.run(None)
return
# actually run the job
task_job = ClearmlJob(
base_task_id=job.base_task_id,
parameter_override=job.task_parameters,
task_overrides=job.task_overrides,
disable_clone_task=not job.clone_task,
allow_caching=False,
target_project=job.target_project,
)
self._log('Scheduling Job {}, Task {} on queue {}.'.format(
job.name, task_job.task_id(), job.queue))
if task_job.launch(queue_name=job.queue):
# mark as run
job.run(task_job.task_id())
self._executed_jobs.append(ExecutedJob(
name=job.name, task_id=task_job.task_id(), started=datetime.utcnow()))
# add timeout check
if job.get_execution_timeout():
# we should probably make sure we are not overwriting a Task
self._timeout_jobs[job.get_execution_timeout()] = task_job.task_id()
def _launch_job_function(self, job):
# type: (ScheduleJob) -> None
# make sure this IS a function job
if not job.base_function:
return
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
# noinspection PyBroadException
try:
a_thread = [t for t in enumerate_threads() if t.ident == job.get_last_executed_task_id()]
if a_thread:
a_thread = a_thread[0]
except Exception:
a_thread = None
if a_thread and a_thread.is_alive():
self._log(
"Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format(
job.name, a_thread.ident
))
job.run(None)
return
self._log("Scheduling Job '{}', Task '{}' on background thread".format(
job.name, job.base_function))
t = Thread(target=job.base_function)
t.start()
# mark as run
job.run(t.ident)
self._executed_jobs.append(ExecutedJob(
name=job.name, thread_id=str(t.ident), started=datetime.utcnow()))
# execution timeout is not supported with function callbacks.
@staticmethod
def _cancel_task(task_id):
# type: (str) -> ()
@ -551,4 +711,3 @@ class TaskScheduler(object):
t.stopped(force=True)
elif status in ('queued',):
Task.dequeue(t)