Black formatting

This commit is contained in:
allegroai 2024-02-09 10:37:27 +02:00
parent 27a8f2cd09
commit 2dab8ae149

View File

@ -8,8 +8,8 @@ from typing import List, Union, Optional, Callable, Sequence, Dict
from attr import attrs, attrib
from dateutil.relativedelta import relativedelta
from .job import ClearmlJob
from .controller import PipelineController
from .job import ClearmlJob
from ..backend_interface.util import datetime_from_isoformat, datetime_to_isoformat, mutually_exclusive
from ..task import Task
@ -28,12 +28,11 @@ class BaseScheduleJob(object):
_executed_instances = attrib(type=list, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items()
if not callable(v) and (full or not str(k).startswith('_'))}
return {k: v for k, v in self.__dict__.items() if not callable(v) and (full or not str(k).startswith("_"))}
def update(self, a_job):
# type: (Union[Dict, BaseScheduleJob]) -> BaseScheduleJob
converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])}
converters = {a.name: a.converter for a in getattr(self, "__attrs_attrs__", [])}
for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items():
if v is not None and not callable(getattr(self, k, v)):
setattr(self, k, converters[k](v) if converters.get(k) else v)
@ -80,7 +79,7 @@ class BaseScheduleJob(object):
@attrs
class ScheduleJob(BaseScheduleJob):
_weekdays_ind = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday')
_weekdays_ind = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday")
execution_limit_hours = attrib(type=float, default=None)
recurring = attrib(type=bool, default=True)
@ -100,8 +99,7 @@ class ScheduleJob(BaseScheduleJob):
# type: () -> None
def check_integer(value):
try:
return False if not isinstance(value, (int, float)) or \
int(value) != float(value) else True
return False if not isinstance(value, (int, float)) or int(value) != float(value) else True
except (TypeError, ValueError):
return False
@ -155,7 +153,7 @@ class ScheduleJob(BaseScheduleJob):
month=int(self.month or 1),
day=int(self.day or 1),
hour=int(self.hour or 0),
minute=int(self.minute or 0)
minute=int(self.minute or 0),
)
if self.weekdays:
self._next_run += relativedelta(weekday=self.get_weekday_ord(self.weekdays[0]))
@ -172,7 +170,7 @@ class ScheduleJob(BaseScheduleJob):
except ValueError:
# in case previous execution was not in the weekday (for example executed immediately at scheduling)
prev_weekday_ind = -1
weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)]
weekday = _weekdays[(prev_weekday_ind + 1) % len(_weekdays)]
prev_timestamp = self._last_executed or self.starting_time
# fix first scheduled job should be as close as possible to starting time
@ -184,8 +182,8 @@ class ScheduleJob(BaseScheduleJob):
# if this is a daily schedule and we can still run it today, then we should
run0 = self._calc_next_run(self.starting_time, weekday)
run1 = self._calc_next_run(run0, weekday)
delta = run1-run0
optional_first_timestamp = self._calc_next_run(prev_timestamp-delta, weekday)
delta = run1 - run0
optional_first_timestamp = self._calc_next_run(prev_timestamp - delta, weekday)
if optional_first_timestamp > prev_timestamp:
# this is us, we can still run it
self._next_run = optional_first_timestamp
@ -236,7 +234,7 @@ class ScheduleJob(BaseScheduleJob):
months=0 if self.year else (self.month or 0),
hours=self.hour or 0,
minutes=self.minute or 0,
weekday=weekday
weekday=weekday,
)
return next_timestamp
@ -283,7 +281,7 @@ class ScheduleJob(BaseScheduleJob):
if self.execution_limit_hours and task_id:
self._execution_timeout = self._last_executed + relativedelta(
hours=int(self.execution_limit_hours),
minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60)
minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60),
)
else:
self._execution_timeout = None
@ -306,16 +304,16 @@ class ExecutedJob(object):
thread_id = attrib(type=str, default=None)
def to_dict(self, full=False):
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith('_')}
return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith("_")}
class BaseScheduler(object):
def __init__(
self,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None,
pooling_frequency_minutes=None
self,
sync_frequency_minutes=15,
force_create_task_name=None,
force_create_task_project=None,
pooling_frequency_minutes=None
):
# type: (float, Optional[str], Optional[str], Optional[float]) -> None
"""
@ -333,8 +331,8 @@ class BaseScheduler(object):
self._sync_frequency_minutes = sync_frequency_minutes
if force_create_task_name or not Task.current_task():
self._task = Task.init(
project_name=force_create_task_project or 'DevOps',
task_name=force_create_task_name or 'Scheduler',
project_name=force_create_task_project or "DevOps",
task_name=force_create_task_name or "Scheduler",
task_type=Task.TaskTypes.service,
auto_resource_monitoring=False,
)
@ -356,12 +354,12 @@ class BaseScheduler(object):
while True:
# sync with backend
try:
if time() - self._last_sync > 60. * self._sync_frequency_minutes:
if time() - self._last_sync > 60.0 * self._sync_frequency_minutes:
self._last_sync = time()
self._deserialize()
self._update_execution_plots()
except Exception as ex:
self._log('Warning: Exception caught during deserialization: {}'.format(ex))
self._log("Warning: Exception caught during deserialization: {}".format(ex))
self._last_sync = time()
try:
@ -369,16 +367,16 @@ class BaseScheduler(object):
self._serialize_state()
self._update_execution_plots()
except Exception as ex:
self._log('Warning: Exception caught during scheduling step: {}'.format(ex))
self._log("Warning: Exception caught during scheduling step: {}".format(ex))
# rate control
sleep(15)
# sleep until the next pool (default None)
if self._pooling_frequency_minutes:
self._log("Sleeping until the next pool in {} minutes".format(self._pooling_frequency_minutes))
sleep(self._pooling_frequency_minutes*60.)
sleep(self._pooling_frequency_minutes * 60.0)
def start_remotely(self, queue='services'):
def start_remotely(self, queue="services"):
# type: (str) -> None
"""
Start the Task TaskScheduler loop (notice this function does not return)
@ -456,11 +454,8 @@ class BaseScheduler(object):
# check if this is a single instance, then we need to abort the Task
if job.single_instance and job.get_last_executed_task_id():
t = Task.get_task(task_id=job.get_last_executed_task_id())
if t.status in ('in_progress', 'queued'):
self._log(
'Skipping Task {} scheduling, previous Task instance {} still running'.format(
job.name, t.id
))
if t.status in ("in_progress", "queued"):
self._log("Skipping Task {} scheduling, previous Task instance {} still running".format(job.name, t.id))
job.run(None)
return None
@ -474,8 +469,7 @@ class BaseScheduler(object):
target_project=job.get_resolved_target_project(),
tags=[add_tags] if add_tags and isinstance(add_tags, str) else add_tags,
)
self._log('Scheduling Job {}, Task {} on queue {}.'.format(
job.name, task_job.task_id(), job.queue))
self._log("Scheduling Job {}, Task {} on queue {}.".format(job.name, task_job.task_id(), job.queue))
if task_job.launch(queue_name=job.queue):
# mark as run
job.run(task_job.task_id())
@ -501,12 +495,12 @@ class BaseScheduler(object):
self._log(
"Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format(
job.name, a_thread.ident
))
)
)
job.run(None)
return None
self._log("Scheduling Job '{}', Task '{}' on background thread".format(
job.name, job.base_function))
self._log("Scheduling Job '{}', Task '{}' on background thread".format(job.name, job.base_function))
t = Thread(target=job.base_function, args=func_args or ())
t.start()
# mark as run
@ -520,9 +514,9 @@ class BaseScheduler(object):
return
t = Task.get_task(task_id=task_id)
status = t.status
if status in ('in_progress',):
if status in ("in_progress",):
t.stopped(force=True)
elif status in ('queued',):
elif status in ("queued",):
Task.dequeue(t)
@ -531,7 +525,8 @@ class TaskScheduler(BaseScheduler):
Task Scheduling controller.
Notice time-zone is ALWAYS UTC
"""
_configuration_section = 'schedule'
_configuration_section = "schedule"
def __init__(self, sync_frequency_minutes=15, force_create_task_name=None, force_create_task_project=None):
# type: (float, Optional[str], Optional[str]) -> None
@ -548,32 +543,32 @@ class TaskScheduler(BaseScheduler):
super(TaskScheduler, self).__init__(
sync_frequency_minutes=sync_frequency_minutes,
force_create_task_name=force_create_task_name,
force_create_task_project=force_create_task_project
force_create_task_project=force_create_task_project,
)
self._schedule_jobs = [] # List[ScheduleJob]
self._timeout_jobs = {} # Dict[datetime, str]
self._executed_jobs = [] # List[ExecutedJob]
def add_task(
self,
schedule_task_id=None, # type: Union[str, Task]
schedule_function=None, # type: Callable
queue=None, # type: str
name=None, # type: Optional[str]
target_project=None, # type: Optional[str]
minute=None, # type: Optional[int]
hour=None, # type: Optional[int]
day=None, # type: Optional[int]
weekdays=None, # type: Optional[List[str]]
month=None, # type: Optional[int]
year=None, # type: Optional[int]
limit_execution_time=None, # type: Optional[float]
single_instance=False, # type: bool
recurring=True, # type: bool
execute_immediately=False, # type: bool
reuse_task=False, # type: bool
task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict]
self,
schedule_task_id=None, # type: Union[str, Task]
schedule_function=None, # type: Callable
queue=None, # type: str
name=None, # type: Optional[str]
target_project=None, # type: Optional[str]
minute=None, # type: Optional[int]
hour=None, # type: Optional[int]
day=None, # type: Optional[int]
weekdays=None, # type: Optional[List[str]]
month=None, # type: Optional[int]
year=None, # type: Optional[int]
limit_execution_time=None, # type: Optional[float]
single_instance=False, # type: bool
recurring=True, # type: bool
execute_immediately=False, # type: bool
reuse_task=False, # type: bool
task_parameters=None, # type: Optional[dict]
task_overrides=None, # type: Optional[dict]
):
# type: (...) -> bool
"""
@ -640,7 +635,7 @@ class TaskScheduler(BaseScheduler):
:return: True if job is successfully added to the scheduling list
"""
mutually_exclusive(schedule_function=schedule_function, schedule_task_id=schedule_task_id)
task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or '')
task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or "")
# noinspection PyProtectedMember
job = ScheduleJob(
@ -715,46 +710,46 @@ class TaskScheduler(BaseScheduler):
# get idle timeout (aka sleeping)
scheduled_jobs = sorted(
[j for j in self._schedule_jobs if j.next_run() is not None],
key=lambda x: x.next_run()
[j for j in self._schedule_jobs if j.next_run() is not None], key=lambda x: x.next_run()
)
# sort by key
timeout_job_datetime = min(self._timeout_jobs, key=self._timeout_jobs.get) if self._timeout_jobs else None
if not scheduled_jobs and timeout_job_datetime is None:
# sleep and retry
seconds = 60. * self._sync_frequency_minutes
self._log('Nothing to do, sleeping for {:.2f} minutes.'.format(seconds / 60.))
seconds = 60.0 * self._sync_frequency_minutes
self._log("Nothing to do, sleeping for {:.2f} minutes.".format(seconds / 60.0))
sleep(seconds)
return False
next_time_stamp = scheduled_jobs[0].next_run() if scheduled_jobs else None
if timeout_job_datetime is not None:
next_time_stamp = (
min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime
)
next_time_stamp = min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime
sleep_time = (next_time_stamp - datetime.utcnow()).total_seconds()
if sleep_time > 0:
# sleep until we need to run a job or maximum sleep time
seconds = min(sleep_time, 60. * self._sync_frequency_minutes)
self._log('Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.'.format(
next_time_stamp, seconds / 60.))
seconds = min(sleep_time, 60.0 * self._sync_frequency_minutes)
self._log(
"Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.".format(
next_time_stamp, seconds / 60.0
)
)
sleep(seconds)
return False
# check if this is a Task timeout check
if timeout_job_datetime is not None and next_time_stamp == timeout_job_datetime:
task_id = self._timeout_jobs[timeout_job_datetime]
self._log('Aborting job due to timeout: {}'.format(task_id))
self._log("Aborting job due to timeout: {}".format(task_id))
self._cancel_task(task_id=task_id)
self._timeout_jobs.pop(timeout_job_datetime, None)
else:
self._log('Launching job: {}'.format(scheduled_jobs[0]))
self._log("Launching job: {}".format(scheduled_jobs[0]))
self._launch_job(scheduled_jobs[0])
return True
def start_remotely(self, queue='services'):
def start_remotely(self, queue="services"):
# type: (str) -> None
"""
Start the Task TaskScheduler loop (notice this function does not return)
@ -770,8 +765,8 @@ class TaskScheduler(BaseScheduler):
"""
# noinspection PyProtectedMember
self._task._set_configuration(
config_type='json',
description='schedule tasks configuration',
config_type="json",
description="schedule tasks configuration",
config_text=self._serialize_schedule_into_string(),
name=self._configuration_section,
)
@ -785,9 +780,9 @@ class TaskScheduler(BaseScheduler):
dict(
scheduled_jobs=[j.to_dict(full=True) for j in self._schedule_jobs],
timeout_jobs={datetime_to_isoformat(k): v for k, v in self._timeout_jobs.items()},
executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs]
executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs],
),
default=datetime_to_isoformat
default=datetime_to_isoformat,
)
self._task.upload_artifact(name="state", artifact_object=json_str, preview="scheduler internal state")
@ -798,34 +793,32 @@ class TaskScheduler(BaseScheduler):
"""
# get artifact
self._task.reload()
artifact_object = self._task.artifacts.get('state')
artifact_object = self._task.artifacts.get("state")
if artifact_object is not None:
state_json_str = artifact_object.get(force_download=True)
if state_json_str is not None:
state_dict = json.loads(state_json_str)
self._schedule_jobs = self.__deserialize_scheduled_jobs(
serialized_jobs_dicts=state_dict.get('scheduled_jobs', []),
current_jobs=self._schedule_jobs
serialized_jobs_dicts=state_dict.get("scheduled_jobs", []), current_jobs=self._schedule_jobs
)
self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get('timeout_jobs') or {})}
self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get('executed_jobs', [])]
self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get("timeout_jobs") or {})}
self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get("executed_jobs", [])]
def _deserialize(self):
# type: () -> None
"""
Deserialize Task scheduling configuration only
"""
self._log('Syncing scheduler')
self._log("Syncing scheduler")
self._task.reload()
# noinspection PyProtectedMember
json_str = self._task._get_configuration_text(name=self._configuration_section)
try:
self._schedule_jobs = self.__deserialize_scheduled_jobs(
serialized_jobs_dicts=json.loads(json_str),
current_jobs=self._schedule_jobs
serialized_jobs_dicts=json.loads(json_str), current_jobs=self._schedule_jobs
)
except Exception as ex:
self._log('Failed deserializing configuration: {}'.format(ex), level=logging.WARN)
self._log("Failed deserializing configuration: {}".format(ex), level=logging.WARN)
return
@staticmethod
@ -858,45 +851,61 @@ class TaskScheduler(BaseScheduler):
if not self._task:
return
task_link_template = self._task.get_output_log_web_page() \
.replace('/{}/'.format(self._task.project), '/{project}/') \
.replace('/{}/'.format(self._task.id), '/{task}/')
task_link_template = (
self._task.get_output_log_web_page()
.replace("/{}/".format(self._task.project), "/{project}/")
.replace("/{}/".format(self._task.id), "/{task}/")
)
# plot the schedule definition
columns = [
'name', 'base_task_id', 'base_function', 'next_run', 'target_project', 'queue',
'minute', 'hour', 'day', 'month', 'year',
'starting_time', 'execution_limit_hours', 'recurring',
'single_instance', 'task_parameters', 'task_overrides', 'clone_task',
"name",
"base_task_id",
"base_function",
"next_run",
"target_project",
"queue",
"minute",
"hour",
"day",
"month",
"year",
"starting_time",
"execution_limit_hours",
"recurring",
"single_instance",
"task_parameters",
"task_overrides",
"clone_task",
]
scheduler_table = [columns]
for j in self._schedule_jobs:
j_dict = j.to_dict()
j_dict['next_run'] = j.next()
j_dict['base_function'] = "{}.{}".format(
getattr(j.base_function, '__module__', ''),
getattr(j.base_function, '__name__', '')
) if j.base_function else ''
j_dict["next_run"] = j.next()
j_dict["base_function"] = (
"{}.{}".format(getattr(j.base_function, "__module__", ""), getattr(j.base_function, "__name__", ""))
if j.base_function
else ""
)
if not j_dict.get('base_task_id'):
j_dict['clone_task'] = ''
if not j_dict.get("base_task_id"):
j_dict["clone_task"] = ""
row = [
str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '')
str(j_dict.get(c)).split(".", 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or "")
for c in columns
]
if row[1]:
row[1] = '<a href="{}">{}</a>'.format(
task_link_template.format(project='*', task=row[1]), row[1])
row[1] = '<a href="{}">{}</a>'.format(task_link_template.format(project="*", task=row[1]), row[1])
scheduler_table += [row]
# plot the already executed Tasks
executed_table = [['name', 'task id', 'started', 'finished']]
executed_table = [["name", "task id", "started", "finished"]]
for executed_job in sorted(self._executed_jobs, key=lambda x: x.started, reverse=True):
if not executed_job.finished:
if executed_job.task_id:
t = Task.get_task(task_id=executed_job.task_id)
if t.status not in ('in_progress', 'queued'):
if t.status not in ("in_progress", "queued"):
executed_job.finished = t.data.completed or datetime.utcnow()
elif executed_job.thread_id:
# noinspection PyBroadException
@ -908,30 +917,31 @@ class TaskScheduler(BaseScheduler):
pass
executed_table += [
[executed_job.name,
'<a href="{}">{}</a>'.format(task_link_template.format(
project='*', task=executed_job.task_id), executed_job.task_id)
if executed_job.task_id else 'function',
str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0]
]
[
executed_job.name,
'<a href="{}">{}</a>'.format(
task_link_template.format(project="*", task=executed_job.task_id), executed_job.task_id
)
if executed_job.task_id
else "function",
str(executed_job.started).split(".", 1)[0],
str(executed_job.finished).split(".", 1)[0],
]
]
self._task.get_logger().report_table(
title='Schedule Tasks', series=' ', iteration=0,
table_plot=scheduler_table
)
self._task.get_logger().report_table(
title='Executed Tasks', series=' ', iteration=0,
table_plot=executed_table
title="Schedule Tasks", series=" ", iteration=0, table_plot=scheduler_table
)
self._task.get_logger().report_table(title="Executed Tasks", series=" ", iteration=0, table_plot=executed_table)
def _launch_job_task(self, job, task_parameters=None, add_tags=None):
# type: (ScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob]
task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags)
# make sure this is not a function job
if task_job:
self._executed_jobs.append(ExecutedJob(
name=job.name, task_id=task_job.task_id(), started=datetime.utcnow()))
self._executed_jobs.append(
ExecutedJob(name=job.name, task_id=task_job.task_id(), started=datetime.utcnow())
)
# add timeout check
if job.get_execution_timeout():
# we should probably make sure we are not overwriting a Task
@ -943,8 +953,9 @@ class TaskScheduler(BaseScheduler):
thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args)
# make sure this is not a function job
if thread_job:
self._executed_jobs.append(ExecutedJob(
name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow()))
self._executed_jobs.append(
ExecutedJob(name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow())
)
# execution timeout is not supported with function callbacks.
return thread_job