From 2dab8ae1499aa657d2574b9a9a2d34da445343c4 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 9 Feb 2024 10:37:27 +0200 Subject: [PATCH] Black formatting --- clearml/automation/scheduler.py | 253 +++++++++++++++++--------------- 1 file changed, 132 insertions(+), 121 deletions(-) diff --git a/clearml/automation/scheduler.py b/clearml/automation/scheduler.py index d09e52ee..545c92b7 100644 --- a/clearml/automation/scheduler.py +++ b/clearml/automation/scheduler.py @@ -8,8 +8,8 @@ from typing import List, Union, Optional, Callable, Sequence, Dict from attr import attrs, attrib from dateutil.relativedelta import relativedelta -from .job import ClearmlJob from .controller import PipelineController +from .job import ClearmlJob from ..backend_interface.util import datetime_from_isoformat, datetime_to_isoformat, mutually_exclusive from ..task import Task @@ -28,12 +28,11 @@ class BaseScheduleJob(object): _executed_instances = attrib(type=list, default=None) def to_dict(self, full=False): - return {k: v for k, v in self.__dict__.items() - if not callable(v) and (full or not str(k).startswith('_'))} + return {k: v for k, v in self.__dict__.items() if not callable(v) and (full or not str(k).startswith("_"))} def update(self, a_job): # type: (Union[Dict, BaseScheduleJob]) -> BaseScheduleJob - converters = {a.name: a.converter for a in getattr(self, '__attrs_attrs__', [])} + converters = {a.name: a.converter for a in getattr(self, "__attrs_attrs__", [])} for k, v in (a_job.to_dict(full=True) if not isinstance(a_job, dict) else a_job).items(): if v is not None and not callable(getattr(self, k, v)): setattr(self, k, converters[k](v) if converters.get(k) else v) @@ -80,7 +79,7 @@ class BaseScheduleJob(object): @attrs class ScheduleJob(BaseScheduleJob): - _weekdays_ind = ('monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday') + _weekdays_ind = ("monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday") execution_limit_hours = attrib(type=float, default=None) recurring = attrib(type=bool, default=True) @@ -100,8 +99,7 @@ class ScheduleJob(BaseScheduleJob): # type: () -> None def check_integer(value): try: - return False if not isinstance(value, (int, float)) or \ - int(value) != float(value) else True + return False if not isinstance(value, (int, float)) or int(value) != float(value) else True except (TypeError, ValueError): return False @@ -155,7 +153,7 @@ class ScheduleJob(BaseScheduleJob): month=int(self.month or 1), day=int(self.day or 1), hour=int(self.hour or 0), - minute=int(self.minute or 0) + minute=int(self.minute or 0), ) if self.weekdays: self._next_run += relativedelta(weekday=self.get_weekday_ord(self.weekdays[0])) @@ -172,7 +170,7 @@ class ScheduleJob(BaseScheduleJob): except ValueError: # in case previous execution was not in the weekday (for example executed immediately at scheduling) prev_weekday_ind = -1 - weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)] + weekday = _weekdays[(prev_weekday_ind + 1) % len(_weekdays)] prev_timestamp = self._last_executed or self.starting_time # fix first scheduled job should be as close as possible to starting time @@ -184,8 +182,8 @@ class ScheduleJob(BaseScheduleJob): # if this is a daily schedule and we can still run it today, then we should run0 = self._calc_next_run(self.starting_time, weekday) run1 = self._calc_next_run(run0, weekday) - delta = run1-run0 - optional_first_timestamp = self._calc_next_run(prev_timestamp-delta, weekday) + delta = run1 - run0 + optional_first_timestamp = self._calc_next_run(prev_timestamp - delta, weekday) if optional_first_timestamp > prev_timestamp: # this is us, we can still run it self._next_run = optional_first_timestamp @@ -236,7 +234,7 @@ class ScheduleJob(BaseScheduleJob): months=0 if self.year else (self.month or 0), hours=self.hour or 0, minutes=self.minute or 0, - weekday=weekday + weekday=weekday, ) return next_timestamp @@ -283,7 +281,7 @@ class ScheduleJob(BaseScheduleJob): if self.execution_limit_hours and task_id: self._execution_timeout = self._last_executed + relativedelta( hours=int(self.execution_limit_hours), - minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60) + minutes=int((self.execution_limit_hours - int(self.execution_limit_hours)) * 60), ) else: self._execution_timeout = None @@ -306,16 +304,16 @@ class ExecutedJob(object): thread_id = attrib(type=str, default=None) def to_dict(self, full=False): - return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith('_')} + return {k: v for k, v in self.__dict__.items() if full or not str(k).startswith("_")} class BaseScheduler(object): def __init__( - self, - sync_frequency_minutes=15, - force_create_task_name=None, - force_create_task_project=None, - pooling_frequency_minutes=None + self, + sync_frequency_minutes=15, + force_create_task_name=None, + force_create_task_project=None, + pooling_frequency_minutes=None ): # type: (float, Optional[str], Optional[str], Optional[float]) -> None """ @@ -333,8 +331,8 @@ class BaseScheduler(object): self._sync_frequency_minutes = sync_frequency_minutes if force_create_task_name or not Task.current_task(): self._task = Task.init( - project_name=force_create_task_project or 'DevOps', - task_name=force_create_task_name or 'Scheduler', + project_name=force_create_task_project or "DevOps", + task_name=force_create_task_name or "Scheduler", task_type=Task.TaskTypes.service, auto_resource_monitoring=False, ) @@ -356,12 +354,12 @@ class BaseScheduler(object): while True: # sync with backend try: - if time() - self._last_sync > 60. * self._sync_frequency_minutes: + if time() - self._last_sync > 60.0 * self._sync_frequency_minutes: self._last_sync = time() self._deserialize() self._update_execution_plots() except Exception as ex: - self._log('Warning: Exception caught during deserialization: {}'.format(ex)) + self._log("Warning: Exception caught during deserialization: {}".format(ex)) self._last_sync = time() try: @@ -369,16 +367,16 @@ class BaseScheduler(object): self._serialize_state() self._update_execution_plots() except Exception as ex: - self._log('Warning: Exception caught during scheduling step: {}'.format(ex)) + self._log("Warning: Exception caught during scheduling step: {}".format(ex)) # rate control sleep(15) # sleep until the next pool (default None) if self._pooling_frequency_minutes: self._log("Sleeping until the next pool in {} minutes".format(self._pooling_frequency_minutes)) - sleep(self._pooling_frequency_minutes*60.) + sleep(self._pooling_frequency_minutes * 60.0) - def start_remotely(self, queue='services'): + def start_remotely(self, queue="services"): # type: (str) -> None """ Start the Task TaskScheduler loop (notice this function does not return) @@ -456,11 +454,8 @@ class BaseScheduler(object): # check if this is a single instance, then we need to abort the Task if job.single_instance and job.get_last_executed_task_id(): t = Task.get_task(task_id=job.get_last_executed_task_id()) - if t.status in ('in_progress', 'queued'): - self._log( - 'Skipping Task {} scheduling, previous Task instance {} still running'.format( - job.name, t.id - )) + if t.status in ("in_progress", "queued"): + self._log("Skipping Task {} scheduling, previous Task instance {} still running".format(job.name, t.id)) job.run(None) return None @@ -474,8 +469,7 @@ class BaseScheduler(object): target_project=job.get_resolved_target_project(), tags=[add_tags] if add_tags and isinstance(add_tags, str) else add_tags, ) - self._log('Scheduling Job {}, Task {} on queue {}.'.format( - job.name, task_job.task_id(), job.queue)) + self._log("Scheduling Job {}, Task {} on queue {}.".format(job.name, task_job.task_id(), job.queue)) if task_job.launch(queue_name=job.queue): # mark as run job.run(task_job.task_id()) @@ -501,12 +495,12 @@ class BaseScheduler(object): self._log( "Skipping Task '{}' scheduling, previous Thread instance '{}' still running".format( job.name, a_thread.ident - )) + ) + ) job.run(None) return None - self._log("Scheduling Job '{}', Task '{}' on background thread".format( - job.name, job.base_function)) + self._log("Scheduling Job '{}', Task '{}' on background thread".format(job.name, job.base_function)) t = Thread(target=job.base_function, args=func_args or ()) t.start() # mark as run @@ -520,9 +514,9 @@ class BaseScheduler(object): return t = Task.get_task(task_id=task_id) status = t.status - if status in ('in_progress',): + if status in ("in_progress",): t.stopped(force=True) - elif status in ('queued',): + elif status in ("queued",): Task.dequeue(t) @@ -531,7 +525,8 @@ class TaskScheduler(BaseScheduler): Task Scheduling controller. Notice time-zone is ALWAYS UTC """ - _configuration_section = 'schedule' + + _configuration_section = "schedule" def __init__(self, sync_frequency_minutes=15, force_create_task_name=None, force_create_task_project=None): # type: (float, Optional[str], Optional[str]) -> None @@ -548,32 +543,32 @@ class TaskScheduler(BaseScheduler): super(TaskScheduler, self).__init__( sync_frequency_minutes=sync_frequency_minutes, force_create_task_name=force_create_task_name, - force_create_task_project=force_create_task_project + force_create_task_project=force_create_task_project, ) self._schedule_jobs = [] # List[ScheduleJob] self._timeout_jobs = {} # Dict[datetime, str] self._executed_jobs = [] # List[ExecutedJob] def add_task( - self, - schedule_task_id=None, # type: Union[str, Task] - schedule_function=None, # type: Callable - queue=None, # type: str - name=None, # type: Optional[str] - target_project=None, # type: Optional[str] - minute=None, # type: Optional[int] - hour=None, # type: Optional[int] - day=None, # type: Optional[int] - weekdays=None, # type: Optional[List[str]] - month=None, # type: Optional[int] - year=None, # type: Optional[int] - limit_execution_time=None, # type: Optional[float] - single_instance=False, # type: bool - recurring=True, # type: bool - execute_immediately=False, # type: bool - reuse_task=False, # type: bool - task_parameters=None, # type: Optional[dict] - task_overrides=None, # type: Optional[dict] + self, + schedule_task_id=None, # type: Union[str, Task] + schedule_function=None, # type: Callable + queue=None, # type: str + name=None, # type: Optional[str] + target_project=None, # type: Optional[str] + minute=None, # type: Optional[int] + hour=None, # type: Optional[int] + day=None, # type: Optional[int] + weekdays=None, # type: Optional[List[str]] + month=None, # type: Optional[int] + year=None, # type: Optional[int] + limit_execution_time=None, # type: Optional[float] + single_instance=False, # type: bool + recurring=True, # type: bool + execute_immediately=False, # type: bool + reuse_task=False, # type: bool + task_parameters=None, # type: Optional[dict] + task_overrides=None, # type: Optional[dict] ): # type: (...) -> bool """ @@ -640,7 +635,7 @@ class TaskScheduler(BaseScheduler): :return: True if job is successfully added to the scheduling list """ mutually_exclusive(schedule_function=schedule_function, schedule_task_id=schedule_task_id) - task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or '') + task_id = schedule_task_id.id if isinstance(schedule_task_id, Task) else str(schedule_task_id or "") # noinspection PyProtectedMember job = ScheduleJob( @@ -715,46 +710,46 @@ class TaskScheduler(BaseScheduler): # get idle timeout (aka sleeping) scheduled_jobs = sorted( - [j for j in self._schedule_jobs if j.next_run() is not None], - key=lambda x: x.next_run() + [j for j in self._schedule_jobs if j.next_run() is not None], key=lambda x: x.next_run() ) # sort by key timeout_job_datetime = min(self._timeout_jobs, key=self._timeout_jobs.get) if self._timeout_jobs else None if not scheduled_jobs and timeout_job_datetime is None: # sleep and retry - seconds = 60. * self._sync_frequency_minutes - self._log('Nothing to do, sleeping for {:.2f} minutes.'.format(seconds / 60.)) + seconds = 60.0 * self._sync_frequency_minutes + self._log("Nothing to do, sleeping for {:.2f} minutes.".format(seconds / 60.0)) sleep(seconds) return False next_time_stamp = scheduled_jobs[0].next_run() if scheduled_jobs else None if timeout_job_datetime is not None: - next_time_stamp = ( - min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime - ) + next_time_stamp = min(next_time_stamp, timeout_job_datetime) if next_time_stamp else timeout_job_datetime sleep_time = (next_time_stamp - datetime.utcnow()).total_seconds() if sleep_time > 0: # sleep until we need to run a job or maximum sleep time - seconds = min(sleep_time, 60. * self._sync_frequency_minutes) - self._log('Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.'.format( - next_time_stamp, seconds / 60.)) + seconds = min(sleep_time, 60.0 * self._sync_frequency_minutes) + self._log( + "Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.".format( + next_time_stamp, seconds / 60.0 + ) + ) sleep(seconds) return False # check if this is a Task timeout check if timeout_job_datetime is not None and next_time_stamp == timeout_job_datetime: task_id = self._timeout_jobs[timeout_job_datetime] - self._log('Aborting job due to timeout: {}'.format(task_id)) + self._log("Aborting job due to timeout: {}".format(task_id)) self._cancel_task(task_id=task_id) self._timeout_jobs.pop(timeout_job_datetime, None) else: - self._log('Launching job: {}'.format(scheduled_jobs[0])) + self._log("Launching job: {}".format(scheduled_jobs[0])) self._launch_job(scheduled_jobs[0]) return True - def start_remotely(self, queue='services'): + def start_remotely(self, queue="services"): # type: (str) -> None """ Start the Task TaskScheduler loop (notice this function does not return) @@ -770,8 +765,8 @@ class TaskScheduler(BaseScheduler): """ # noinspection PyProtectedMember self._task._set_configuration( - config_type='json', - description='schedule tasks configuration', + config_type="json", + description="schedule tasks configuration", config_text=self._serialize_schedule_into_string(), name=self._configuration_section, ) @@ -785,9 +780,9 @@ class TaskScheduler(BaseScheduler): dict( scheduled_jobs=[j.to_dict(full=True) for j in self._schedule_jobs], timeout_jobs={datetime_to_isoformat(k): v for k, v in self._timeout_jobs.items()}, - executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs] + executed_jobs=[j.to_dict(full=True) for j in self._executed_jobs], ), - default=datetime_to_isoformat + default=datetime_to_isoformat, ) self._task.upload_artifact(name="state", artifact_object=json_str, preview="scheduler internal state") @@ -798,34 +793,32 @@ class TaskScheduler(BaseScheduler): """ # get artifact self._task.reload() - artifact_object = self._task.artifacts.get('state') + artifact_object = self._task.artifacts.get("state") if artifact_object is not None: state_json_str = artifact_object.get(force_download=True) if state_json_str is not None: state_dict = json.loads(state_json_str) self._schedule_jobs = self.__deserialize_scheduled_jobs( - serialized_jobs_dicts=state_dict.get('scheduled_jobs', []), - current_jobs=self._schedule_jobs + serialized_jobs_dicts=state_dict.get("scheduled_jobs", []), current_jobs=self._schedule_jobs ) - self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get('timeout_jobs') or {})} - self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get('executed_jobs', [])] + self._timeout_jobs = {datetime_from_isoformat(k): v for k, v in (state_dict.get("timeout_jobs") or {})} + self._executed_jobs = [ExecutedJob(**j) for j in state_dict.get("executed_jobs", [])] def _deserialize(self): # type: () -> None """ Deserialize Task scheduling configuration only """ - self._log('Syncing scheduler') + self._log("Syncing scheduler") self._task.reload() # noinspection PyProtectedMember json_str = self._task._get_configuration_text(name=self._configuration_section) try: self._schedule_jobs = self.__deserialize_scheduled_jobs( - serialized_jobs_dicts=json.loads(json_str), - current_jobs=self._schedule_jobs + serialized_jobs_dicts=json.loads(json_str), current_jobs=self._schedule_jobs ) except Exception as ex: - self._log('Failed deserializing configuration: {}'.format(ex), level=logging.WARN) + self._log("Failed deserializing configuration: {}".format(ex), level=logging.WARN) return @staticmethod @@ -858,45 +851,61 @@ class TaskScheduler(BaseScheduler): if not self._task: return - task_link_template = self._task.get_output_log_web_page() \ - .replace('/{}/'.format(self._task.project), '/{project}/') \ - .replace('/{}/'.format(self._task.id), '/{task}/') + task_link_template = ( + self._task.get_output_log_web_page() + .replace("/{}/".format(self._task.project), "/{project}/") + .replace("/{}/".format(self._task.id), "/{task}/") + ) # plot the schedule definition columns = [ - 'name', 'base_task_id', 'base_function', 'next_run', 'target_project', 'queue', - 'minute', 'hour', 'day', 'month', 'year', - 'starting_time', 'execution_limit_hours', 'recurring', - 'single_instance', 'task_parameters', 'task_overrides', 'clone_task', + "name", + "base_task_id", + "base_function", + "next_run", + "target_project", + "queue", + "minute", + "hour", + "day", + "month", + "year", + "starting_time", + "execution_limit_hours", + "recurring", + "single_instance", + "task_parameters", + "task_overrides", + "clone_task", ] scheduler_table = [columns] for j in self._schedule_jobs: j_dict = j.to_dict() - j_dict['next_run'] = j.next() - j_dict['base_function'] = "{}.{}".format( - getattr(j.base_function, '__module__', ''), - getattr(j.base_function, '__name__', '') - ) if j.base_function else '' + j_dict["next_run"] = j.next() + j_dict["base_function"] = ( + "{}.{}".format(getattr(j.base_function, "__module__", ""), getattr(j.base_function, "__name__", "")) + if j.base_function + else "" + ) - if not j_dict.get('base_task_id'): - j_dict['clone_task'] = '' + if not j_dict.get("base_task_id"): + j_dict["clone_task"] = "" row = [ - str(j_dict.get(c)).split('.', 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or '') + str(j_dict.get(c)).split(".", 1)[0] if isinstance(j_dict.get(c), datetime) else str(j_dict.get(c) or "") for c in columns ] if row[1]: - row[1] = '{}'.format( - task_link_template.format(project='*', task=row[1]), row[1]) + row[1] = '{}'.format(task_link_template.format(project="*", task=row[1]), row[1]) scheduler_table += [row] # plot the already executed Tasks - executed_table = [['name', 'task id', 'started', 'finished']] + executed_table = [["name", "task id", "started", "finished"]] for executed_job in sorted(self._executed_jobs, key=lambda x: x.started, reverse=True): if not executed_job.finished: if executed_job.task_id: t = Task.get_task(task_id=executed_job.task_id) - if t.status not in ('in_progress', 'queued'): + if t.status not in ("in_progress", "queued"): executed_job.finished = t.data.completed or datetime.utcnow() elif executed_job.thread_id: # noinspection PyBroadException @@ -908,30 +917,31 @@ class TaskScheduler(BaseScheduler): pass executed_table += [ - [executed_job.name, - '{}'.format(task_link_template.format( - project='*', task=executed_job.task_id), executed_job.task_id) - if executed_job.task_id else 'function', - str(executed_job.started).split('.', 1)[0], str(executed_job.finished).split('.', 1)[0] - ] + [ + executed_job.name, + '{}'.format( + task_link_template.format(project="*", task=executed_job.task_id), executed_job.task_id + ) + if executed_job.task_id + else "function", + str(executed_job.started).split(".", 1)[0], + str(executed_job.finished).split(".", 1)[0], + ] ] self._task.get_logger().report_table( - title='Schedule Tasks', series=' ', iteration=0, - table_plot=scheduler_table - ) - self._task.get_logger().report_table( - title='Executed Tasks', series=' ', iteration=0, - table_plot=executed_table + title="Schedule Tasks", series=" ", iteration=0, table_plot=scheduler_table ) + self._task.get_logger().report_table(title="Executed Tasks", series=" ", iteration=0, table_plot=executed_table) def _launch_job_task(self, job, task_parameters=None, add_tags=None): # type: (ScheduleJob, Optional[dict], Optional[List[str]]) -> Optional[ClearmlJob] task_job = super(TaskScheduler, self)._launch_job_task(job, task_parameters=task_parameters, add_tags=add_tags) # make sure this is not a function job if task_job: - self._executed_jobs.append(ExecutedJob( - name=job.name, task_id=task_job.task_id(), started=datetime.utcnow())) + self._executed_jobs.append( + ExecutedJob(name=job.name, task_id=task_job.task_id(), started=datetime.utcnow()) + ) # add timeout check if job.get_execution_timeout(): # we should probably make sure we are not overwriting a Task @@ -943,8 +953,9 @@ class TaskScheduler(BaseScheduler): thread_job = super(TaskScheduler, self)._launch_job_function(job, func_args=func_args) # make sure this is not a function job if thread_job: - self._executed_jobs.append(ExecutedJob( - name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow())) + self._executed_jobs.append( + ExecutedJob(name=job.name, thread_id=str(thread_job.ident), started=datetime.utcnow()) + ) # execution timeout is not supported with function callbacks. return thread_job