From a966e4afd09a1beaadfe192e0b5a2b70d532eaf3 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 17 May 2024 10:13:50 +0300 Subject: [PATCH] Move `Task.set_resource_monitor_iteration_timeout()` to a class method, add `wait_for_first_iteration_to_start_sec` and `max_wait_for_first_iteration_to_start_sec` arguments --- clearml/task.py | 84 ++++++++++++++++++++++----- clearml/utilities/resource_monitor.py | 42 +++++++++++--- 2 files changed, 105 insertions(+), 21 deletions(-) diff --git a/clearml/task.py b/clearml/task.py index 16a320d9..470d92e1 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -248,7 +248,7 @@ class Task(_Task): output_uri=None, # type: Optional[Union[str, bool]] auto_connect_arg_parser=True, # type: Union[bool, Mapping[str, bool]] auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]] - auto_resource_monitoring=True, # type: bool + auto_resource_monitoring=True, # type: Union[bool, Mapping[str, Any]] auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]] deferred_init=False, # type: bool ): @@ -411,6 +411,19 @@ class Task(_Task): - ``True`` - Automatically create resource monitoring plots. (default) - ``False`` - Do not automatically create. - Class Type - Create ResourceMonitor object of the specified class type. + - dict - Dictionary of kwargs to be passed to the ResourceMonitor instance. + The keys can be: + - `report_start_sec` OR `first_report_sec` OR `seconds_from_start` - Maximum number of seconds + to wait for scalar/plot reporting before defaulting + to machine statistics reporting based on seconds from experiment start time + - `wait_for_first_iteration_to_start_sec` - Set the initial time (seconds) to wait for iteration + reporting to be used as x-axis for the resource monitoring, + if timeout exceeds then reverts to `seconds_from_start` + - `max_wait_for_first_iteration_to_start_sec` - Set the maximum time (seconds) to allow the resource + monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start` + - `report_mem_used_per_process` OR `report_global_mem_used` - Compatibility feature, + report memory usage for the entire machine + default (false), report only on the running process and its sub-processes :param auto_connect_streams: Control the automatic logging of stdout and stderr. The values are: @@ -729,14 +742,31 @@ class Task(_Task): if auto_resource_monitoring and not is_sub_process_task_id: resource_monitor_cls = auto_resource_monitoring \ if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor + resource_monitor_kwargs = dict( + report_mem_used_per_process=not config.get("development.worker.report_global_mem_used", False), + first_report_sec=config.get("development.worker.report_start_sec", None), + wait_for_first_iteration_to_start_sec=config.get( + "development.worker.wait_for_first_iteration_to_start_sec", None + ), + max_wait_for_first_iteration_to_start_sec=config.get( + "development.worker.max_wait_for_first_iteration_to_start_sec", None + ), + ) + if isinstance(auto_resource_monitoring, dict): + if "report_start_sec" in auto_resource_monitoring: + auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop("report_start_sec") + if "seconds_from_start" in auto_resource_monitoring: + auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop( + "seconds_from_start" + ) + if "report_global_mem_used" in auto_resource_monitoring: + auto_resource_monitoring["report_mem_used_per_process"] = auto_resource_monitoring.pop( + "report_global_mem_used" + ) + resource_monitor_kwargs.update(auto_resource_monitoring) task._resource_monitor = resource_monitor_cls( task, - report_mem_used_per_process=not config.get('development.worker.report_global_mem_used', False), - first_report_sec=config.get('development.worker.report_start_sec', None), - wait_for_first_iteration_to_start_sec=config.get( - 'development.worker.wait_for_first_iteration_to_start_sec', None), - max_wait_for_first_iteration_to_start_sec=config.get( - 'development.worker.max_wait_for_first_iteration_to_start_sec', None), + **resource_monitor_kwargs ) task._resource_monitor.start() @@ -2841,21 +2871,47 @@ class Task(_Task): docker_setup_bash_script=docker_setup_bash_script ) - def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800): - # type: (float) -> bool + @classmethod + def set_resource_monitor_iteration_timeout( + cls, + seconds_from_start=30.0, + wait_for_first_iteration_to_start_sec=180.0, + max_wait_for_first_iteration_to_start_sec=1800.0, + ): + # type: (float, float, float) -> bool """ Set the ResourceMonitor maximum duration (in seconds) to wait until first scalar/plot is reported. If timeout is reached without any reporting, the ResourceMonitor will start reporting machine statistics based - on seconds from Task start time (instead of based on iteration) + on seconds from Task start time (instead of based on iteration). + Notice! Should be called before `Task.init`. :param seconds_from_start: Maximum number of seconds to wait for scalar/plot reporting before defaulting to machine statistics reporting based on seconds from experiment start time + :param wait_for_first_iteration_to_start_sec: Set the initial time (seconds) to wait for iteration reporting + to be used as x-axis for the resource monitoring, if timeout exceeds then reverts to `seconds_from_start` + :param max_wait_for_first_iteration_to_start_sec: Set the maximum time (seconds) to allow the resource + monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start` + :return: True if success """ - if not self._resource_monitor: - return False - self._resource_monitor.wait_for_first_iteration = seconds_from_start - self._resource_monitor.max_check_first_iteration = seconds_from_start + if ResourceMonitor._resource_monitor_instances: + getLogger().warning( + "Task.set_resource_monitor_iteration_timeout called after Task.init." + " This might not work since the values might not be used in forked processes" + ) + # noinspection PyProtectedMember + for instance in ResourceMonitor._resource_monitor_instances: + # noinspection PyProtectedMember + instance._first_report_sec = seconds_from_start + instance.wait_for_first_iteration = wait_for_first_iteration_to_start_sec + instance.max_check_first_iteration = max_wait_for_first_iteration_to_start_sec + + # noinspection PyProtectedMember + ResourceMonitor._first_report_sec_default = seconds_from_start + # noinspection PyProtectedMember + ResourceMonitor._wait_for_first_iteration_to_start_sec_default = wait_for_first_iteration_to_start_sec + # noinspection PyProtectedMember + ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default = max_wait_for_first_iteration_to_start_sec return True def execute_remotely(self, queue_name=None, clone=False, exit_process=True): diff --git a/clearml/utilities/resource_monitor.py b/clearml/utilities/resource_monitor.py index ad855cce..41465d6a 100644 --- a/clearml/utilities/resource_monitor.py +++ b/clearml/utilities/resource_monitor.py @@ -23,19 +23,47 @@ except ImportError: class ResourceMonitor(BackgroundMonitor): _title_machine = ':monitor:machine' _title_gpu = ':monitor:gpu' + _first_report_sec_default = 30.0 + _wait_for_first_iteration_to_start_sec_default = 180.0 + _max_wait_for_first_iteration_to_start_sec_default = 1800.0 + _resource_monitor_instances = [] def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., - first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0, - max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True): + first_report_sec=None, wait_for_first_iteration_to_start_sec=None, + max_wait_for_first_iteration_to_start_sec=None, report_mem_used_per_process=True): super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec) + # noinspection PyProtectedMember + ResourceMonitor._resource_monitor_instances.append(self) self._task = task self._sample_frequency = sample_frequency_per_sec self._report_frequency = report_frequency_sec - self._first_report_sec = first_report_sec or report_frequency_sec - self.wait_for_first_iteration = 180. if wait_for_first_iteration_to_start_sec is None \ - else wait_for_first_iteration_to_start_sec - self.max_check_first_iteration = 1800. if max_wait_for_first_iteration_to_start_sec is None \ - else max_wait_for_first_iteration_to_start_sec + # noinspection PyProtectedMember + self._first_report_sec = next( + value + # noinspection PyProtectedMember + for value in (first_report_sec, ResourceMonitor._first_report_sec_default, report_frequency_sec) + if value is not None + ) + self.wait_for_first_iteration = next( + value + for value in ( + wait_for_first_iteration_to_start_sec, + # noinspection PyProtectedMember + ResourceMonitor._wait_for_first_iteration_to_start_sec_default, + 0.0 + ) + if value is not None + ) + self.max_check_first_iteration = next( + value + for value in ( + max_wait_for_first_iteration_to_start_sec, + # noinspection PyProtectedMember + ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default, + 0.0 + ) + if value is not None + ) self._num_readouts = 0 self._readouts = {} self._previous_readouts = {}