Move Task.set_resource_monitor_iteration_timeout() to a class method, add wait_for_first_iteration_to_start_sec and max_wait_for_first_iteration_to_start_sec arguments

This commit is contained in:
allegroai 2024-05-17 10:13:50 +03:00
parent 3fde18f6f6
commit a966e4afd0
2 changed files with 105 additions and 21 deletions

View File

@ -248,7 +248,7 @@ class Task(_Task):
output_uri=None, # type: Optional[Union[str, bool]] output_uri=None, # type: Optional[Union[str, bool]]
auto_connect_arg_parser=True, # type: Union[bool, Mapping[str, bool]] auto_connect_arg_parser=True, # type: Union[bool, Mapping[str, bool]]
auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]] auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]]
auto_resource_monitoring=True, # type: bool auto_resource_monitoring=True, # type: Union[bool, Mapping[str, Any]]
auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]] auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]]
deferred_init=False, # type: bool deferred_init=False, # type: bool
): ):
@ -411,6 +411,19 @@ class Task(_Task):
- ``True`` - Automatically create resource monitoring plots. (default) - ``True`` - Automatically create resource monitoring plots. (default)
- ``False`` - Do not automatically create. - ``False`` - Do not automatically create.
- Class Type - Create ResourceMonitor object of the specified class type. - Class Type - Create ResourceMonitor object of the specified class type.
- dict - Dictionary of kwargs to be passed to the ResourceMonitor instance.
The keys can be:
- `report_start_sec` OR `first_report_sec` OR `seconds_from_start` - Maximum number of seconds
to wait for scalar/plot reporting before defaulting
to machine statistics reporting based on seconds from experiment start time
- `wait_for_first_iteration_to_start_sec` - Set the initial time (seconds) to wait for iteration
reporting to be used as x-axis for the resource monitoring,
if timeout exceeds then reverts to `seconds_from_start`
- `max_wait_for_first_iteration_to_start_sec` - Set the maximum time (seconds) to allow the resource
monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start`
- `report_mem_used_per_process` OR `report_global_mem_used` - Compatibility feature,
report memory usage for the entire machine
default (false), report only on the running process and its sub-processes
:param auto_connect_streams: Control the automatic logging of stdout and stderr. :param auto_connect_streams: Control the automatic logging of stdout and stderr.
The values are: The values are:
@ -729,14 +742,31 @@ class Task(_Task):
if auto_resource_monitoring and not is_sub_process_task_id: if auto_resource_monitoring and not is_sub_process_task_id:
resource_monitor_cls = auto_resource_monitoring \ resource_monitor_cls = auto_resource_monitoring \
if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor
resource_monitor_kwargs = dict(
report_mem_used_per_process=not config.get("development.worker.report_global_mem_used", False),
first_report_sec=config.get("development.worker.report_start_sec", None),
wait_for_first_iteration_to_start_sec=config.get(
"development.worker.wait_for_first_iteration_to_start_sec", None
),
max_wait_for_first_iteration_to_start_sec=config.get(
"development.worker.max_wait_for_first_iteration_to_start_sec", None
),
)
if isinstance(auto_resource_monitoring, dict):
if "report_start_sec" in auto_resource_monitoring:
auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop("report_start_sec")
if "seconds_from_start" in auto_resource_monitoring:
auto_resource_monitoring["first_report_sec"] = auto_resource_monitoring.pop(
"seconds_from_start"
)
if "report_global_mem_used" in auto_resource_monitoring:
auto_resource_monitoring["report_mem_used_per_process"] = auto_resource_monitoring.pop(
"report_global_mem_used"
)
resource_monitor_kwargs.update(auto_resource_monitoring)
task._resource_monitor = resource_monitor_cls( task._resource_monitor = resource_monitor_cls(
task, task,
report_mem_used_per_process=not config.get('development.worker.report_global_mem_used', False), **resource_monitor_kwargs
first_report_sec=config.get('development.worker.report_start_sec', None),
wait_for_first_iteration_to_start_sec=config.get(
'development.worker.wait_for_first_iteration_to_start_sec', None),
max_wait_for_first_iteration_to_start_sec=config.get(
'development.worker.max_wait_for_first_iteration_to_start_sec', None),
) )
task._resource_monitor.start() task._resource_monitor.start()
@ -2841,21 +2871,47 @@ class Task(_Task):
docker_setup_bash_script=docker_setup_bash_script docker_setup_bash_script=docker_setup_bash_script
) )
def set_resource_monitor_iteration_timeout(self, seconds_from_start=1800): @classmethod
# type: (float) -> bool def set_resource_monitor_iteration_timeout(
cls,
seconds_from_start=30.0,
wait_for_first_iteration_to_start_sec=180.0,
max_wait_for_first_iteration_to_start_sec=1800.0,
):
# type: (float, float, float) -> bool
""" """
Set the ResourceMonitor maximum duration (in seconds) to wait until first scalar/plot is reported. Set the ResourceMonitor maximum duration (in seconds) to wait until first scalar/plot is reported.
If timeout is reached without any reporting, the ResourceMonitor will start reporting machine statistics based If timeout is reached without any reporting, the ResourceMonitor will start reporting machine statistics based
on seconds from Task start time (instead of based on iteration) on seconds from Task start time (instead of based on iteration).
Notice! Should be called before `Task.init`.
:param seconds_from_start: Maximum number of seconds to wait for scalar/plot reporting before defaulting :param seconds_from_start: Maximum number of seconds to wait for scalar/plot reporting before defaulting
to machine statistics reporting based on seconds from experiment start time to machine statistics reporting based on seconds from experiment start time
:param wait_for_first_iteration_to_start_sec: Set the initial time (seconds) to wait for iteration reporting
to be used as x-axis for the resource monitoring, if timeout exceeds then reverts to `seconds_from_start`
:param max_wait_for_first_iteration_to_start_sec: Set the maximum time (seconds) to allow the resource
monitoring to revert back to iteration reporting x-axis after starting to report `seconds_from_start`
:return: True if success :return: True if success
""" """
if not self._resource_monitor: if ResourceMonitor._resource_monitor_instances:
return False getLogger().warning(
self._resource_monitor.wait_for_first_iteration = seconds_from_start "Task.set_resource_monitor_iteration_timeout called after Task.init."
self._resource_monitor.max_check_first_iteration = seconds_from_start " This might not work since the values might not be used in forked processes"
)
# noinspection PyProtectedMember
for instance in ResourceMonitor._resource_monitor_instances:
# noinspection PyProtectedMember
instance._first_report_sec = seconds_from_start
instance.wait_for_first_iteration = wait_for_first_iteration_to_start_sec
instance.max_check_first_iteration = max_wait_for_first_iteration_to_start_sec
# noinspection PyProtectedMember
ResourceMonitor._first_report_sec_default = seconds_from_start
# noinspection PyProtectedMember
ResourceMonitor._wait_for_first_iteration_to_start_sec_default = wait_for_first_iteration_to_start_sec
# noinspection PyProtectedMember
ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default = max_wait_for_first_iteration_to_start_sec
return True return True
def execute_remotely(self, queue_name=None, clone=False, exit_process=True): def execute_remotely(self, queue_name=None, clone=False, exit_process=True):

View File

@ -23,19 +23,47 @@ except ImportError:
class ResourceMonitor(BackgroundMonitor): class ResourceMonitor(BackgroundMonitor):
_title_machine = ':monitor:machine' _title_machine = ':monitor:machine'
_title_gpu = ':monitor:gpu' _title_gpu = ':monitor:gpu'
_first_report_sec_default = 30.0
_wait_for_first_iteration_to_start_sec_default = 180.0
_max_wait_for_first_iteration_to_start_sec_default = 1800.0
_resource_monitor_instances = []
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=180.0, first_report_sec=None, wait_for_first_iteration_to_start_sec=None,
max_wait_for_first_iteration_to_start_sec=1800., report_mem_used_per_process=True): max_wait_for_first_iteration_to_start_sec=None, report_mem_used_per_process=True):
super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec) super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec)
# noinspection PyProtectedMember
ResourceMonitor._resource_monitor_instances.append(self)
self._task = task self._task = task
self._sample_frequency = sample_frequency_per_sec self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec self._report_frequency = report_frequency_sec
self._first_report_sec = first_report_sec or report_frequency_sec # noinspection PyProtectedMember
self.wait_for_first_iteration = 180. if wait_for_first_iteration_to_start_sec is None \ self._first_report_sec = next(
else wait_for_first_iteration_to_start_sec value
self.max_check_first_iteration = 1800. if max_wait_for_first_iteration_to_start_sec is None \ # noinspection PyProtectedMember
else max_wait_for_first_iteration_to_start_sec for value in (first_report_sec, ResourceMonitor._first_report_sec_default, report_frequency_sec)
if value is not None
)
self.wait_for_first_iteration = next(
value
for value in (
wait_for_first_iteration_to_start_sec,
# noinspection PyProtectedMember
ResourceMonitor._wait_for_first_iteration_to_start_sec_default,
0.0
)
if value is not None
)
self.max_check_first_iteration = next(
value
for value in (
max_wait_for_first_iteration_to_start_sec,
# noinspection PyProtectedMember
ResourceMonitor._max_wait_for_first_iteration_to_start_sec_default,
0.0
)
if value is not None
)
self._num_readouts = 0 self._num_readouts = 0
self._readouts = {} self._readouts = {}
self._previous_readouts = {} self._previous_readouts = {}