mirror of
https://github.com/clearml/clearml
synced 2025-03-10 14:01:20 +00:00
Optimize task refresh while pulling task status in local worker and last iteration for Resource Monitoring
This commit is contained in:
parent
cc1508b2bd
commit
e3ae4f4e26
@ -30,10 +30,15 @@ class TaskStopSignal(object):
|
||||
def test(self):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
status = str(self.task.status)
|
||||
message = self.task.data.status_message
|
||||
# we use internal status read, so we do not need to constantly pull the entire task object,
|
||||
# it might be large, and we want to also avoid the edit lock on it.
|
||||
status, message = self.task._get_status()
|
||||
status = str(status)
|
||||
message = str(message)
|
||||
|
||||
if status == str(tasks.TaskStatusEnum.in_progress) and "stopping" in message:
|
||||
# make sure we syn the entire task object
|
||||
self.task.reload()
|
||||
return TaskStopReason.stopped
|
||||
|
||||
_expected_statuses = (
|
||||
@ -43,12 +48,16 @@ class TaskStopSignal(object):
|
||||
)
|
||||
|
||||
if status not in _expected_statuses and "worker" not in message:
|
||||
# make sure we syn the entire task object
|
||||
self.task.reload()
|
||||
return TaskStopReason.status_changed
|
||||
|
||||
if status == str(tasks.TaskStatusEnum.created):
|
||||
self._task_reset_state_counter += 1
|
||||
|
||||
if self._task_reset_state_counter >= self._number_of_consecutive_reset_tests:
|
||||
# make sure we syn the entire task object
|
||||
self.task.reload()
|
||||
return TaskStopReason.reset
|
||||
|
||||
self.task.log.warning(
|
||||
|
@ -815,6 +815,24 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
|
||||
self._files_server = Session.get_files_server_host()
|
||||
return self._files_server
|
||||
|
||||
def _get_status(self):
|
||||
try:
|
||||
all_tasks = self.send(
|
||||
tasks.GetAllRequest(id=[self.id], only_fields=['status', 'status_message']),
|
||||
).response.tasks
|
||||
return all_tasks[0].status, all_tasks[0].status_message
|
||||
except Exception:
|
||||
return None, None
|
||||
|
||||
def _reload_last_iteration(self):
|
||||
try:
|
||||
all_tasks = self.send(
|
||||
tasks.GetAllRequest(id=[self.id], only_fields=['last_iteration']),
|
||||
).response.tasks
|
||||
self.data.last_iteration = all_tasks[0].last_iteration
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _get_api_server(cls):
|
||||
return Session.get_api_server_host()
|
||||
|
@ -848,7 +848,7 @@ class Task(_Task):
|
||||
|
||||
:return: last reported iteration number (integer)
|
||||
"""
|
||||
self.reload()
|
||||
self._reload_last_iteration()
|
||||
return max(self.data.last_iteration, self._reporter.max_iteration if self._reporter else 0)
|
||||
|
||||
def set_last_iteration(self, last_iteration):
|
||||
|
Loading…
Reference in New Issue
Block a user