Send task.ping for executing tasks every 120 seconds (set using the agent.task_ping_interval_sec configuration option)

This commit is contained in:
allegroai 2022-12-05 11:22:25 +02:00
parent 396abf13b6
commit 57cde21c48
2 changed files with 25 additions and 0 deletions

View File

@ -14,6 +14,14 @@ except ImportError:
ConverterType = TypeVar("ConverterType", bound=Callable[[Any], Any])
def text_to_int(value, default=0):
# type: (Any, int) -> int
try:
return int(value)
except (ValueError, TypeError):
return default
def base64_to_text(value):
# type: (Any) -> Text
return base64.b64decode(value).decode("utf-8")

View File

@ -41,6 +41,7 @@ from clearml_agent.backend_api.session.defs import (
ENV_VENV_CONFIGURED, ENV_PROPAGATE_EXITCODE, )
from clearml_agent.backend_config.defs import UptimeConf
from clearml_agent.backend_config.utils import apply_environment, apply_files
from clearml_agent.backend_config.converters import text_to_int
from clearml_agent.commands.base import resolve_names, ServiceCommandSection
from clearml_agent.commands.resolver import resolve_default_container
from clearml_agent.definitions import (
@ -686,6 +687,10 @@ class Worker(ServiceCommandSection):
else:
self._docker_args_filters = []
self._task_ping_interval_sec = max(
0, text_to_int(self._session.config.get("agent.task_ping_interval_sec", 120.0))
)
@classmethod
def _verify_command_states(cls, kwargs):
"""
@ -1737,6 +1742,7 @@ class Worker(ServiceCommandSection):
stopping = False
status = None
process = None
last_task_ping = 0
try:
_last_machine_update_ts = time()
stop_reason = None
@ -1772,6 +1778,17 @@ class Worker(ServiceCommandSection):
if stderr:
stderr.flush()
if self._task_ping_interval_sec and time() - last_task_ping > self._task_ping_interval_sec:
# noinspection PyBroadException
try:
res = (session or self._session).send(tasks_api.PingRequest(task=task_id))
if not res:
self.log.error("Failed sending ping for task %s: %s", task_id, res.response)
except Exception as ex:
self.log.error("Failed sending ping: %s", str(ex))
finally:
self._task_ping_interval_sec = time()
# get diff from previous poll
printed_lines, stdout_pos_count = _print_file(stdout_path, stdout_pos_count)
if self._services_mode and not stopping and status is None: