mirror of
https://github.com/clearml/clearml
synced 2025-06-10 00:25:53 +00:00
Fix TaskScheduler initial scheduled Task could be closer to starting date
This commit is contained in:
parent
487fe75946
commit
aff7f87975
@ -76,6 +76,7 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
_next_run = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
_next_run = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
||||||
_execution_timeout = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
_execution_timeout = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
||||||
_last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
_last_executed = attrib(type=datetime, converter=datetime_from_isoformat, default=None)
|
||||||
|
_schedule_counter = attrib(type=int, default=0)
|
||||||
|
|
||||||
def verify(self):
|
def verify(self):
|
||||||
# type: () -> None
|
# type: () -> None
|
||||||
@ -143,6 +144,7 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
|
|
||||||
return self._next_run
|
return self._next_run
|
||||||
|
|
||||||
|
# check if we have a specific day of the week
|
||||||
weekday = None
|
weekday = None
|
||||||
if self.weekdays:
|
if self.weekdays:
|
||||||
# get previous weekday
|
# get previous weekday
|
||||||
@ -154,8 +156,27 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
prev_weekday_ind = -1
|
prev_weekday_ind = -1
|
||||||
weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)]
|
weekday = _weekdays[(prev_weekday_ind+1) % len(_weekdays)]
|
||||||
|
|
||||||
# check if we have a specific day of the week
|
|
||||||
prev_timestamp = self._last_executed or self.starting_time
|
prev_timestamp = self._last_executed or self.starting_time
|
||||||
|
# fix first scheduled job should be as close as possible to starting time
|
||||||
|
if self._schedule_counter < 1:
|
||||||
|
# we should get here the first time we need to schedule a job, after that the delta is fixed
|
||||||
|
# If we have execute_immediately we need to get here after the first execution
|
||||||
|
# (so even through we have self._last_executed)
|
||||||
|
|
||||||
|
# if this is a daily schedule and we can still run it today, then we should
|
||||||
|
run0 = self._calc_next_run(self.starting_time, weekday)
|
||||||
|
run1 = self._calc_next_run(run0, weekday)
|
||||||
|
delta = run1-run0
|
||||||
|
optional_first_timestamp = self._calc_next_run(prev_timestamp-delta, weekday)
|
||||||
|
if optional_first_timestamp > prev_timestamp:
|
||||||
|
# this is us, we can still run it
|
||||||
|
self._next_run = optional_first_timestamp
|
||||||
|
return self._next_run
|
||||||
|
|
||||||
|
self._next_run = self._calc_next_run(prev_timestamp, weekday)
|
||||||
|
return self._next_run
|
||||||
|
|
||||||
|
def _calc_next_run(self, prev_timestamp, weekday):
|
||||||
# make sure that if we have a specific day we zero the minutes/hours/seconds
|
# make sure that if we have a specific day we zero the minutes/hours/seconds
|
||||||
if self.year:
|
if self.year:
|
||||||
prev_timestamp = datetime(
|
prev_timestamp = datetime(
|
||||||
@ -199,8 +220,9 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
minutes=self.minute or 0,
|
minutes=self.minute or 0,
|
||||||
weekday=weekday
|
weekday=weekday
|
||||||
)
|
)
|
||||||
self._next_run = next_timestamp
|
|
||||||
return self._next_run
|
return next_timestamp
|
||||||
|
|
||||||
elif self.day is not None and weekday is not None:
|
elif self.day is not None and weekday is not None:
|
||||||
# push to the next day (so we only have once a day)
|
# push to the next day (so we only have once a day)
|
||||||
prev_timestamp = datetime(
|
prev_timestamp = datetime(
|
||||||
@ -209,14 +231,14 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
day=prev_timestamp.day,
|
day=prev_timestamp.day,
|
||||||
) + relativedelta(days=1)
|
) + relativedelta(days=1)
|
||||||
elif self.day:
|
elif self.day:
|
||||||
# reset minutes in the hour (we will be adding additional hour anyhow
|
# reset minutes in the hour (we will be adding additional hour/minute anyhow)
|
||||||
prev_timestamp = datetime(
|
prev_timestamp = datetime(
|
||||||
year=prev_timestamp.year,
|
year=prev_timestamp.year,
|
||||||
month=prev_timestamp.month,
|
month=prev_timestamp.month,
|
||||||
day=prev_timestamp.day,
|
day=prev_timestamp.day,
|
||||||
)
|
)
|
||||||
elif self.hour:
|
elif self.hour:
|
||||||
# reset minutes in the hour (we will be adding additional hour anyhow
|
# reset minutes in the hour (we will be adding additional minutes anyhow)
|
||||||
prev_timestamp = datetime(
|
prev_timestamp = datetime(
|
||||||
year=prev_timestamp.year,
|
year=prev_timestamp.year,
|
||||||
month=prev_timestamp.month,
|
month=prev_timestamp.month,
|
||||||
@ -224,7 +246,7 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
hour=prev_timestamp.hour,
|
hour=prev_timestamp.hour,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._next_run = prev_timestamp + relativedelta(
|
return prev_timestamp + relativedelta(
|
||||||
years=self.year or 0,
|
years=self.year or 0,
|
||||||
months=0 if self.year else (self.month or 0),
|
months=0 if self.year else (self.month or 0),
|
||||||
days=0 if self.month or self.year else ((self.day or 0) if weekday is None else 0),
|
days=0 if self.month or self.year else ((self.day or 0) if weekday is None else 0),
|
||||||
@ -232,11 +254,13 @@ class ScheduleJob(BaseScheduleJob):
|
|||||||
minutes=self.minute or 0,
|
minutes=self.minute or 0,
|
||||||
weekday=weekday,
|
weekday=weekday,
|
||||||
)
|
)
|
||||||
return self._next_run
|
|
||||||
|
|
||||||
def run(self, task_id):
|
def run(self, task_id):
|
||||||
# type: (Optional[str]) -> datetime
|
# type: (Optional[str]) -> datetime
|
||||||
super(ScheduleJob, self).run(task_id)
|
super(ScheduleJob, self).run(task_id)
|
||||||
|
if self._last_executed or self.starting_time != datetime.fromtimestamp(0):
|
||||||
|
self._schedule_counter += 1
|
||||||
|
|
||||||
self._last_executed = datetime.utcnow()
|
self._last_executed = datetime.utcnow()
|
||||||
if self.execution_limit_hours and task_id:
|
if self.execution_limit_hours and task_id:
|
||||||
self._execution_timeout = self._last_executed + relativedelta(
|
self._execution_timeout = self._last_executed + relativedelta(
|
||||||
@ -689,7 +713,8 @@ class TaskScheduler(BaseScheduler):
|
|||||||
if sleep_time > 0:
|
if sleep_time > 0:
|
||||||
# sleep until we need to run a job or maximum sleep time
|
# sleep until we need to run a job or maximum sleep time
|
||||||
seconds = min(sleep_time, 60. * self._sync_frequency_minutes)
|
seconds = min(sleep_time, 60. * self._sync_frequency_minutes)
|
||||||
self._log('Waiting for next run, sleeping for {:.2f} minutes, until next sync.'.format(seconds / 60.))
|
self._log('Waiting for next run [UTC {}], sleeping for {:.2f} minutes, until next sync.'.format(
|
||||||
|
next_time_stamp, seconds / 60.))
|
||||||
sleep(seconds)
|
sleep(seconds)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user