Merge branch 'master' of https://github.com/allegroai/clearml
@ -20,12 +20,12 @@ from ..task import Task
|
||||
from ..backend_api.services import tasks as tasks_service
|
||||
|
||||
|
||||
logger = getLogger('clearml.automation.job')
|
||||
logger = getLogger("clearml.automation.job")
|
||||
|
||||
|
||||
class BaseJob(object):
|
||||
_job_hash_description = 'job_hash={}'
|
||||
_job_hash_property = 'pipeline_job_hash'
|
||||
_job_hash_description = "job_hash={}"
|
||||
_job_hash_property = "pipeline_job_hash"
|
||||
_hashing_callback = None
|
||||
_last_batch_status_update_ts = 0
|
||||
|
||||
@ -58,19 +58,22 @@ class BaseJob(object):
|
||||
id=[self.task.id],
|
||||
page=0,
|
||||
page_size=1,
|
||||
only_fields=['id', ] + metrics
|
||||
only_fields=[
|
||||
"id",
|
||||
]
|
||||
+ metrics,
|
||||
)
|
||||
)
|
||||
response = res.wait()
|
||||
|
||||
return tuple(response.response_data['tasks'][0]['last_metrics'][title][series][v] for v in values)
|
||||
return tuple(response.response_data["tasks"][0]["last_metrics"][title][series][v] for v in values)
|
||||
|
||||
@staticmethod
|
||||
def get_metric_req_params(title, series):
|
||||
title = hashlib.md5(str(title).encode('utf-8')).hexdigest()
|
||||
series = hashlib.md5(str(series).encode('utf-8')).hexdigest()
|
||||
metric = 'last_metrics.{}.{}.'.format(title, series)
|
||||
values = ['min_value', 'max_value', 'value']
|
||||
title = hashlib.md5(str(title).encode("utf-8")).hexdigest()
|
||||
series = hashlib.md5(str(series).encode("utf-8")).hexdigest()
|
||||
metric = "last_metrics.{}.{}.".format(title, series)
|
||||
values = ["min_value", "max_value", "value"]
|
||||
metrics = [metric + v for v in values]
|
||||
return metrics, title, series, values
|
||||
|
||||
@ -89,7 +92,7 @@ class BaseJob(object):
|
||||
Task.enqueue(task=self.task, queue_name=queue_name)
|
||||
return True
|
||||
except Exception as ex:
|
||||
logger.warning('Error enqueuing Task {} to {}: {}'.format(self.task, queue_name, ex))
|
||||
logger.warning("Error enqueuing Task {} to {}: {}".format(self.task, queue_name, ex))
|
||||
return False
|
||||
|
||||
def abort(self):
|
||||
@ -156,7 +159,7 @@ class BaseJob(object):
|
||||
|
||||
:return: Task status Task.TaskStatusEnum in string.
|
||||
"""
|
||||
if self._last_status and not force and time() - self._last_status_ts < 1.:
|
||||
if self._last_status and not force and time() - self._last_status_ts < 1.0:
|
||||
return self._last_status
|
||||
|
||||
self._last_status = self.task.status
|
||||
@ -205,7 +208,7 @@ class BaseJob(object):
|
||||
# noinspection PyProtectedMember
|
||||
id_map[task_id]._last_status_ts = last_batch_update_ts
|
||||
|
||||
def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False):
|
||||
def wait(self, timeout=None, pool_period=30.0, aborted_nonresponsive_as_running=False):
|
||||
# type: (Optional[float], float, bool) -> bool
|
||||
"""
|
||||
Wait until the task is fully executed (i.e., aborted/completed/failed)
|
||||
@ -218,7 +221,7 @@ class BaseJob(object):
|
||||
:return: True, if Task finished.
|
||||
"""
|
||||
tic = time()
|
||||
while timeout is None or time() - tic < timeout * 60.:
|
||||
while timeout is None or time() - tic < timeout * 60.0:
|
||||
if self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running):
|
||||
return True
|
||||
sleep(pool_period)
|
||||
@ -274,16 +277,23 @@ class BaseJob(object):
|
||||
"""
|
||||
task_status = self.status()
|
||||
# check if we are Not in any of the non-running states
|
||||
if task_status not in (Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed,
|
||||
Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published):
|
||||
if task_status not in (
|
||||
Task.TaskStatusEnum.stopped,
|
||||
Task.TaskStatusEnum.completed,
|
||||
Task.TaskStatusEnum.failed,
|
||||
Task.TaskStatusEnum.published,
|
||||
):
|
||||
return False
|
||||
|
||||
# notice the status update also refresh the "status_message" field on the Task
|
||||
|
||||
# if we are stopped but the message says "non-responsive" it means for some reason the
|
||||
# Task's instance was killed, we should ignore it if requested because we assume someone will bring it back
|
||||
if aborted_nonresponsive_as_running and task_status == Task.TaskStatusEnum.stopped and \
|
||||
str(self.task.data.status_message).lower() == "forced stop (non-responsive)":
|
||||
if (
|
||||
aborted_nonresponsive_as_running
|
||||
and task_status == Task.TaskStatusEnum.stopped
|
||||
and str(self.task.data.status_message).lower() == "forced stop (non-responsive)"
|
||||
):
|
||||
# if we are here it means the state is "stopped" but we should ignore it
|
||||
# because the non-responsive watchdog set it. We assume someone (autoscaler) will relaunch it.
|
||||
return False
|
||||
@ -298,7 +308,7 @@ class BaseJob(object):
|
||||
|
||||
:return: True the task is currently in failed state
|
||||
"""
|
||||
return self.status() in (Task.TaskStatusEnum.failed, )
|
||||
return self.status() in (Task.TaskStatusEnum.failed,)
|
||||
|
||||
def is_completed(self):
|
||||
# type: () -> bool
|
||||
@ -316,7 +326,7 @@ class BaseJob(object):
|
||||
|
||||
:return: True the task is currently in aborted state
|
||||
"""
|
||||
return self.status() in (Task.TaskStatusEnum.stopped, )
|
||||
return self.status() in (Task.TaskStatusEnum.stopped,)
|
||||
|
||||
def is_pending(self):
|
||||
# type: () -> bool
|
||||
@ -334,8 +344,7 @@ class BaseJob(object):
|
||||
|
||||
:return: False, if the task is currently in draft mode or pending.
|
||||
"""
|
||||
if not self.task_started and self.task.status in (
|
||||
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created):
|
||||
if not self.task_started and self.task.status in (Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created):
|
||||
return False
|
||||
|
||||
self.task_started = True
|
||||
@ -378,12 +387,12 @@ class BaseJob(object):
|
||||
|
||||
@classmethod
|
||||
def _create_task_hash(
|
||||
cls,
|
||||
task,
|
||||
section_overrides=None,
|
||||
params_override=None,
|
||||
configurations_override=None,
|
||||
explicit_docker_image=None
|
||||
cls,
|
||||
task,
|
||||
section_overrides=None,
|
||||
params_override=None,
|
||||
configurations_override=None,
|
||||
explicit_docker_image=None,
|
||||
):
|
||||
# type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str]
|
||||
"""
|
||||
@ -466,22 +475,21 @@ class BaseJob(object):
|
||||
"""
|
||||
if not task_hash:
|
||||
return None
|
||||
if Session.check_min_api_version('2.13'):
|
||||
if Session.check_min_api_version("2.13"):
|
||||
# noinspection PyProtectedMember
|
||||
potential_tasks = Task._query_tasks(
|
||||
status=['completed', 'published'],
|
||||
system_tags=['-{}'.format(Task.archived_tag)],
|
||||
_all_=dict(fields=['runtime.{}'.format(cls._job_hash_property)],
|
||||
pattern=exact_match_regex(task_hash)),
|
||||
only_fields=['id'],
|
||||
status=["completed", "published"],
|
||||
system_tags=["-{}".format(Task.archived_tag)],
|
||||
_all_=dict(fields=["runtime.{}".format(cls._job_hash_property)], pattern=exact_match_regex(task_hash)),
|
||||
only_fields=["id"],
|
||||
)
|
||||
else:
|
||||
# noinspection PyProtectedMember
|
||||
potential_tasks = Task._query_tasks(
|
||||
status=['completed', 'published'],
|
||||
system_tags=['-{}'.format(Task.archived_tag)],
|
||||
_all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)),
|
||||
only_fields=['id'],
|
||||
status=["completed", "published"],
|
||||
system_tags=["-{}".format(Task.archived_tag)],
|
||||
_all_=dict(fields=["comment"], pattern=cls._job_hash_description.format(task_hash)),
|
||||
only_fields=["id"],
|
||||
)
|
||||
for obj in potential_tasks:
|
||||
task = Task.get_task(task_id=obj.id)
|
||||
@ -500,30 +508,29 @@ class BaseJob(object):
|
||||
return
|
||||
if not task_hash:
|
||||
task_hash = cls._create_task_hash(task=task)
|
||||
if Session.check_min_api_version('2.13'):
|
||||
if Session.check_min_api_version("2.13"):
|
||||
# noinspection PyProtectedMember
|
||||
task._set_runtime_properties(runtime_properties={cls._job_hash_property: str(task_hash)})
|
||||
else:
|
||||
hash_comment = cls._job_hash_description.format(task_hash) + '\n'
|
||||
task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment)
|
||||
hash_comment = cls._job_hash_description.format(task_hash) + "\n"
|
||||
task.set_comment(task.comment + "\n" + hash_comment if task.comment else hash_comment)
|
||||
|
||||
|
||||
class ClearmlJob(BaseJob):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_task_id, # type: str
|
||||
parameter_override=None, # type: Optional[Mapping[str, str]]
|
||||
task_overrides=None, # type: Optional[Mapping[str, str]]
|
||||
configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
|
||||
tags=None, # type: Optional[Sequence[str]]
|
||||
parent=None, # type: Optional[str]
|
||||
disable_clone_task=False, # type: bool
|
||||
allow_caching=False, # type: bool
|
||||
target_project=None, # type: Optional[str]
|
||||
output_uri=None, # type: Optional[Union[str, bool]]
|
||||
enable_local_imports=True, # type: bool
|
||||
**kwargs # type: Any
|
||||
self,
|
||||
base_task_id, # type: str
|
||||
parameter_override=None, # type: Optional[Mapping[str, str]]
|
||||
task_overrides=None, # type: Optional[Mapping[str, str]]
|
||||
configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
|
||||
tags=None, # type: Optional[Sequence[str]]
|
||||
parent=None, # type: Optional[str]
|
||||
disable_clone_task=False, # type: bool
|
||||
allow_caching=False, # type: bool
|
||||
target_project=None, # type: Optional[str]
|
||||
output_uri=None, # type: Optional[Union[str, bool]]
|
||||
enable_local_imports=True, # type: bool
|
||||
**kwargs # type: Any
|
||||
):
|
||||
# type: (...) -> ()
|
||||
"""
|
||||
@ -561,8 +568,10 @@ class ClearmlJob(BaseJob):
|
||||
self.task = base_temp_task
|
||||
task_status = self.task.status
|
||||
if task_status != Task.TaskStatusEnum.created:
|
||||
logger.warning('Task cloning disabled but requested Task [{}] status={}. '
|
||||
'Reverting to clone Task'.format(base_task_id, task_status))
|
||||
logger.warning(
|
||||
"Task cloning disabled but requested Task [{}] status={}. "
|
||||
"Reverting to clone Task".format(base_task_id, task_status)
|
||||
)
|
||||
disable_clone_task = False
|
||||
self.task = None
|
||||
elif parent:
|
||||
@ -582,14 +591,16 @@ class ClearmlJob(BaseJob):
|
||||
task_configurations = deepcopy(base_temp_task.data.configuration or {})
|
||||
for k, v in configuration_overrides.items():
|
||||
if not isinstance(v, (str, dict)):
|
||||
raise ValueError('Configuration override dictionary value must be wither str or dict, '
|
||||
'got {} instead'.format(type(v)))
|
||||
raise ValueError(
|
||||
"Configuration override dictionary value must be wither str or dict, "
|
||||
"got {} instead".format(type(v))
|
||||
)
|
||||
value = v if isinstance(v, str) else json.dumps(v)
|
||||
if k in task_configurations:
|
||||
task_configurations[k].value = value
|
||||
else:
|
||||
task_configurations[k] = tasks_service.ConfigurationItem(
|
||||
name=str(k), value=value, description=None, type='json' if isinstance(v, dict) else None
|
||||
name=str(k), value=value, description=None, type="json" if isinstance(v, dict) else None
|
||||
)
|
||||
configuration_overrides = {k: v.value for k, v in task_configurations.items()}
|
||||
|
||||
@ -600,7 +611,7 @@ class ClearmlJob(BaseJob):
|
||||
# notice we can allow ourselves to change the base-task object as we will not use it any further
|
||||
# noinspection PyProtectedMember
|
||||
base_temp_task._set_task_property(k, v, raise_on_error=False, log_on_error=True)
|
||||
section = k.split('.')[0]
|
||||
section = k.split(".")[0]
|
||||
sections[section] = getattr(base_temp_task.data, section, None)
|
||||
|
||||
# check cached task
|
||||
@ -614,7 +625,7 @@ class ClearmlJob(BaseJob):
|
||||
section_overrides=sections,
|
||||
params_override=task_params,
|
||||
configurations_override=configuration_overrides or None,
|
||||
explicit_docker_image=kwargs.get("explicit_docker_image")
|
||||
explicit_docker_image=kwargs.get("explicit_docker_image"),
|
||||
)
|
||||
task = self._get_cached_task(task_hash)
|
||||
# if we found a task, just use
|
||||
@ -631,20 +642,23 @@ class ClearmlJob(BaseJob):
|
||||
return
|
||||
|
||||
# if we have target_project, remove project from kwargs if we have it.
|
||||
if target_project and 'project' in kwargs:
|
||||
if target_project and "project" in kwargs:
|
||||
logger.info(
|
||||
'target_project={} and project={} passed, using target_project.'.format(
|
||||
target_project, kwargs['project']))
|
||||
kwargs.pop('project', None)
|
||||
"target_project={} and project={} passed, using target_project.".format(
|
||||
target_project, kwargs["project"]
|
||||
)
|
||||
)
|
||||
kwargs.pop("project", None)
|
||||
|
||||
# check again if we need to clone the Task
|
||||
if not disable_clone_task:
|
||||
# noinspection PyProtectedMember
|
||||
self.task = Task.clone(
|
||||
base_task_id, parent=parent or base_task_id,
|
||||
project=get_or_create_project(
|
||||
session=Task._get_default_session(), project_name=target_project
|
||||
) if target_project else kwargs.pop('project', None),
|
||||
base_task_id,
|
||||
parent=parent or base_task_id,
|
||||
project=get_or_create_project(session=Task._get_default_session(), project_name=target_project)
|
||||
if target_project
|
||||
else kwargs.pop("project", None),
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@ -676,6 +690,7 @@ class LocalClearmlJob(ClearmlJob):
|
||||
Run jobs locally as a sub-process, use only when no agents are available (this will not use queues)
|
||||
or for debug purposes.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LocalClearmlJob, self).__init__(*args, **kwargs)
|
||||
self._job_process = None
|
||||
@ -695,34 +710,34 @@ class LocalClearmlJob(ClearmlJob):
|
||||
|
||||
# check if standalone
|
||||
diff = self.task.data.script.diff
|
||||
if diff and not diff.lstrip().startswith('diff '):
|
||||
if diff and not diff.lstrip().startswith("diff "):
|
||||
# standalone, we need to create if
|
||||
fd, local_filename = tempfile.mkstemp(suffix='.py')
|
||||
fd, local_filename = tempfile.mkstemp(suffix=".py")
|
||||
os.close(fd)
|
||||
with open(local_filename, 'wt') as f:
|
||||
with open(local_filename, "wt") as f:
|
||||
f.write(diff)
|
||||
self._local_temp_file = local_filename
|
||||
else:
|
||||
local_filename = self.task.data.script.entry_point
|
||||
|
||||
cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir or '')
|
||||
cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir or "")
|
||||
# try to check based on current root repo + entrypoint
|
||||
if Task.current_task() and not (Path(cwd)/local_filename).is_file():
|
||||
working_dir = Task.current_task().data.script.working_dir or ''
|
||||
working_dir = working_dir.strip('.')
|
||||
if Task.current_task() and not (Path(cwd) / local_filename).is_file():
|
||||
working_dir = Task.current_task().data.script.working_dir or ""
|
||||
working_dir = working_dir.strip(".")
|
||||
levels = 0
|
||||
if working_dir:
|
||||
levels = 1 + sum(1 for c in working_dir if c == '/')
|
||||
cwd = os.path.abspath(os.path.join(os.getcwd(), os.sep.join(['..'] * levels))) if levels else os.getcwd()
|
||||
levels = 1 + sum(1 for c in working_dir if c == "/")
|
||||
cwd = os.path.abspath(os.path.join(os.getcwd(), os.sep.join([".."] * levels))) if levels else os.getcwd()
|
||||
cwd = os.path.join(cwd, working_dir)
|
||||
|
||||
python = sys.executable
|
||||
env = dict(**os.environ)
|
||||
env.pop('CLEARML_PROC_MASTER_ID', None)
|
||||
env.pop('TRAINS_PROC_MASTER_ID', None)
|
||||
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
|
||||
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
|
||||
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
|
||||
env.pop("CLEARML_PROC_MASTER_ID", None)
|
||||
env.pop("TRAINS_PROC_MASTER_ID", None)
|
||||
env["CLEARML_TASK_ID"] = env["TRAINS_TASK_ID"] = str(self.task.id)
|
||||
env["CLEARML_LOG_TASK_TO_BACKEND"] = "1"
|
||||
env["CLEARML_SIMULATE_REMOTE_TASK"] = "1"
|
||||
try:
|
||||
if self._enable_local_imports:
|
||||
current_python_path = env.get("PYTHONPATH")
|
||||
@ -765,7 +780,7 @@ class LocalClearmlJob(ClearmlJob):
|
||||
user_aborted = False
|
||||
if self.task.status == Task.TaskStatusEnum.stopped:
|
||||
self.task.reload()
|
||||
if str(self.task.data.status_reason).lower().startswith('user aborted'):
|
||||
if str(self.task.data.status_reason).lower().startswith("user aborted"):
|
||||
user_aborted = True
|
||||
|
||||
if not user_aborted:
|
||||
@ -826,13 +841,13 @@ class _JobStub(object):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_task_id, # type: str
|
||||
parameter_override=None, # type: Optional[Mapping[str, str]]
|
||||
task_overrides=None, # type: Optional[Mapping[str, str]]
|
||||
tags=None, # type: Optional[Sequence[str]]
|
||||
**kwargs # type: Any
|
||||
):
|
||||
self,
|
||||
base_task_id, # type: str
|
||||
parameter_override=None, # type: Optional[Mapping[str, str]]
|
||||
task_overrides=None, # type: Optional[Mapping[str, str]]
|
||||
tags=None, # type: Optional[Sequence[str]]
|
||||
**kwargs # type: Any
|
||||
):
|
||||
# type: (...) -> ()
|
||||
self.task = None
|
||||
self.base_task_id = base_task_id
|
||||
@ -846,7 +861,7 @@ class _JobStub(object):
|
||||
# type: (str) -> ()
|
||||
self.iteration = 0
|
||||
self.task_started = time()
|
||||
print('launching', self.parameter_override, 'in', queue_name)
|
||||
print("launching", self.parameter_override, "in", queue_name)
|
||||
|
||||
def abort(self):
|
||||
# type: () -> ()
|
||||
@ -886,13 +901,13 @@ class _JobStub(object):
|
||||
|
||||
def task_id(self):
|
||||
# type: () -> str
|
||||
return 'stub'
|
||||
return "stub"
|
||||
|
||||
def status(self):
|
||||
# type: () -> str
|
||||
return 'in_progress'
|
||||
return "in_progress"
|
||||
|
||||
def wait(self, timeout=None, pool_period=30.):
|
||||
def wait(self, timeout=None, pool_period=30.0):
|
||||
# type: (Optional[float], float) -> bool
|
||||
"""
|
||||
Wait for the task to be processed (i.e., aborted/completed/failed)
|
||||
|
@ -22,17 +22,24 @@ except ImportError:
|
||||
|
||||
|
||||
class ResourceMonitor(BackgroundMonitor):
|
||||
_title_machine = ':monitor:machine'
|
||||
_title_gpu = ':monitor:gpu'
|
||||
_title_machine = ":monitor:machine"
|
||||
_title_gpu = ":monitor:gpu"
|
||||
_first_report_sec_default = 30.0
|
||||
_wait_for_first_iteration_to_start_sec_default = 180.0
|
||||
_max_wait_for_first_iteration_to_start_sec_default = 1800.0
|
||||
_resource_monitor_instances = []
|
||||
_multi_node_single_task = None
|
||||
|
||||
def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
|
||||
first_report_sec=None, wait_for_first_iteration_to_start_sec=None,
|
||||
max_wait_for_first_iteration_to_start_sec=None, report_mem_used_per_process=True):
|
||||
def __init__(
|
||||
self,
|
||||
task,
|
||||
sample_frequency_per_sec=2.0,
|
||||
report_frequency_sec=30.0,
|
||||
first_report_sec=None,
|
||||
wait_for_first_iteration_to_start_sec=None,
|
||||
max_wait_for_first_iteration_to_start_sec=None,
|
||||
report_mem_used_per_process=True
|
||||
):
|
||||
super(ResourceMonitor, self).__init__(task=task, wait_period=sample_frequency_per_sec)
|
||||
# noinspection PyProtectedMember
|
||||
ResourceMonitor._resource_monitor_instances.append(self)
|
||||
@ -91,14 +98,13 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
self._debug_mode = False
|
||||
|
||||
if not self._gpustat:
|
||||
self._task.get_logger().report_text('ClearML Monitor: GPU monitoring is not available')
|
||||
self._task.get_logger().report_text("ClearML Monitor: GPU monitoring is not available")
|
||||
else: # if running_remotely():
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
active_gpus = os.environ.get('NVIDIA_VISIBLE_DEVICES', '') or \
|
||||
os.environ.get('CUDA_VISIBLE_DEVICES', '')
|
||||
active_gpus = os.environ.get("NVIDIA_VISIBLE_DEVICES", "") or os.environ.get("CUDA_VISIBLE_DEVICES", "")
|
||||
if active_gpus and active_gpus != "all":
|
||||
self._active_gpus = [g.strip() for g in active_gpus.split(',')]
|
||||
self._active_gpus = [g.strip() for g in active_gpus.split(",")]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@ -128,7 +134,7 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
rank = int(os.environ.get("RANK", os.environ.get('SLURM_PROCID')) or 0)
|
||||
rank = int(os.environ.get("RANK", os.environ.get("SLURM_PROCID")) or 0)
|
||||
world_size_digits = ceil(log10(int(os.environ.get("WORLD_SIZE") or 0)))
|
||||
except Exception:
|
||||
pass
|
||||
@ -151,15 +157,16 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
pass
|
||||
|
||||
# add Task runtime_properties with the machine spec
|
||||
if Session.check_min_api_version('2.13'):
|
||||
if Session.check_min_api_version("2.13"):
|
||||
try:
|
||||
machine_spec = self._get_machine_specs()
|
||||
if machine_spec:
|
||||
# noinspection PyProtectedMember
|
||||
self._task._set_runtime_properties(runtime_properties=machine_spec)
|
||||
except Exception as ex:
|
||||
logging.getLogger('clearml.resource_monitor').debug(
|
||||
'Failed logging machine specification: {}'.format(ex))
|
||||
logging.getLogger("clearml.resource_monitor").debug(
|
||||
"Failed logging machine specification: {}".format(ex)
|
||||
)
|
||||
|
||||
# last_iteration_interval = None
|
||||
# last_iteration_ts = 0
|
||||
@ -183,14 +190,17 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
if IsTensorboardInit.tensorboard_used():
|
||||
fallback_to_sec_as_iterations = False
|
||||
elif seconds_since_started >= self.wait_for_first_iteration:
|
||||
self._task.get_logger().report_text('ClearML Monitor: Could not detect iteration reporting, '
|
||||
'falling back to iterations as seconds-from-start')
|
||||
self._task.get_logger().report_text(
|
||||
"ClearML Monitor: Could not detect iteration reporting, "
|
||||
"falling back to iterations as seconds-from-start"
|
||||
)
|
||||
fallback_to_sec_as_iterations = True
|
||||
elif fallback_to_sec_as_iterations is True and seconds_since_started <= self.max_check_first_iteration:
|
||||
if self._check_logger_reported():
|
||||
fallback_to_sec_as_iterations = False
|
||||
self._task.get_logger().report_text('ClearML Monitor: Reporting detected, '
|
||||
'reverting back to iteration based reporting')
|
||||
self._task.get_logger().report_text(
|
||||
"ClearML Monitor: Reporting detected, " "reverting back to iteration based reporting"
|
||||
)
|
||||
|
||||
clear_readouts = True
|
||||
# if we do not have last_iteration, we just use seconds as iteration
|
||||
@ -228,8 +238,8 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
# 3 digits after the dot
|
||||
value = round(v * 1000) / 1000.
|
||||
title = self._title_gpu if k.startswith('gpu_') else self._title_machine
|
||||
value = round(v * 1000) / 1000.0
|
||||
title = self._title_gpu if k.startswith("gpu_") else self._title_machine
|
||||
series = k
|
||||
if multi_node_single_task_reporting:
|
||||
if report_node_as_series:
|
||||
@ -237,22 +247,26 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# can always check the default cpu/gpu utilization
|
||||
if rank == 0:
|
||||
self._task.get_logger().report_scalar(
|
||||
title=title, series=series,
|
||||
iteration=iteration, value=value)
|
||||
title=title, series=series, iteration=iteration, value=value
|
||||
)
|
||||
|
||||
# now let's create an additional report
|
||||
title = "{}:{}".format(":".join(title.split(":")[:-1]), series)
|
||||
series = "rank {:0{world_size_digits}d}".format(
|
||||
rank, world_size_digits=world_size_digits)
|
||||
rank, world_size_digits=world_size_digits
|
||||
)
|
||||
elif rank > 0:
|
||||
title = "{}:rank{:0{world_size_digits}d}".format(
|
||||
title, rank, world_size_digits=world_size_digits)
|
||||
title, rank, world_size_digits=world_size_digits
|
||||
)
|
||||
else:
|
||||
# for rank 0 we keep the same original report so that external services
|
||||
# can always check the default cpu/gpu utilization
|
||||
pass
|
||||
|
||||
self._task.get_logger().report_scalar(title=title, series=series, iteration=iteration, value=value)
|
||||
self._task.get_logger().report_scalar(
|
||||
title=title, series=series, iteration=iteration, value=value
|
||||
)
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
@ -269,7 +283,7 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
self._previous_readouts_ts = time()
|
||||
for k, v in readouts.items():
|
||||
# cumulative measurements
|
||||
if k.endswith('_mbs'):
|
||||
if k.endswith("_mbs"):
|
||||
v = (v - self._previous_readouts.get(k, v)) / elapsed
|
||||
|
||||
self._readouts[k] = self._readouts.get(k, 0.0) + v
|
||||
@ -296,15 +310,16 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
"cpu_usage": sum(cpu_usage) / float(len(cpu_usage)),
|
||||
}
|
||||
|
||||
bytes_per_megabyte = 1024 ** 2
|
||||
bytes_per_megabyte = 1024**2
|
||||
|
||||
def bytes_to_megabytes(x):
|
||||
return x / bytes_per_megabyte
|
||||
|
||||
virtual_memory = psutil.virtual_memory()
|
||||
# stats["memory_used_gb"] = bytes_to_megabytes(virtual_memory.used) / 1024
|
||||
stats["memory_used_gb"] = bytes_to_megabytes(
|
||||
self._get_process_used_memory() if self._process_info else virtual_memory.used) / 1024
|
||||
stats["memory_used_gb"] = (
|
||||
bytes_to_megabytes(self._get_process_used_memory() if self._process_info else virtual_memory.used) / 1024
|
||||
)
|
||||
stats["memory_free_gb"] = bytes_to_megabytes(virtual_memory.available) / 1024
|
||||
disk_use_percentage = psutil.disk_usage(Text(Path.home())).percent
|
||||
stats["disk_free_percent"] = 100.0 - disk_use_percentage
|
||||
@ -312,7 +327,7 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
if logging.root.level > logging.DEBUG: # If the logging level is bigger than debug, ignore
|
||||
# psutil.sensors_temperatures warnings
|
||||
warnings.simplefilter("ignore", category=RuntimeWarning)
|
||||
sensor_stat = (psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {})
|
||||
sensor_stat = psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {}
|
||||
if "coretemp" in sensor_stat and len(sensor_stat["coretemp"]):
|
||||
stats["cpu_temperature"] = max([float(t.current) for t in sensor_stat["coretemp"]])
|
||||
|
||||
@ -344,9 +359,10 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# something happened and we can't use gpu stats,
|
||||
self._gpustat_fail += 1
|
||||
if self._gpustat_fail >= 3:
|
||||
msg = 'ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring'
|
||||
msg = "ClearML Monitor: GPU monitoring failed getting GPU reading, switching off GPU monitoring"
|
||||
if self._debug_mode:
|
||||
import traceback
|
||||
|
||||
msg += "\n" + traceback.format_exc()
|
||||
self._task.get_logger().report_text(msg)
|
||||
self._gpustat = None
|
||||
@ -410,9 +426,8 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
|
||||
# only run the memory usage query once per reporting period
|
||||
# because this memory query is relatively slow, and changes very little.
|
||||
if self._last_process_pool.get('cpu') and \
|
||||
(time() - self._last_process_pool['cpu'][0]) < self._report_frequency:
|
||||
return self._last_process_pool['cpu'][1]
|
||||
if self._last_process_pool.get("cpu") and (time() - self._last_process_pool["cpu"][0]) < self._report_frequency:
|
||||
return self._last_process_pool["cpu"][1]
|
||||
|
||||
# if we have no parent process, return 0 (it's an error)
|
||||
if not self._process_info:
|
||||
@ -420,7 +435,7 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
|
||||
self._last_process_id_list = []
|
||||
mem_size = mem_usage_children(0, self._process_info)
|
||||
self._last_process_pool['cpu'] = time(), mem_size
|
||||
self._last_process_pool["cpu"] = time(), mem_size
|
||||
|
||||
return mem_size
|
||||
|
||||
@ -448,9 +463,14 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# On the rest of the samples we return the previous memory measurement
|
||||
|
||||
# update mem used by our process and sub processes
|
||||
if self._gpu_memory_per_process and self._process_info and \
|
||||
(not self._last_process_pool.get('gpu') or
|
||||
(time() - self._last_process_pool['gpu'][0]) >= self._report_frequency):
|
||||
if (
|
||||
self._gpu_memory_per_process
|
||||
and self._process_info
|
||||
and (
|
||||
not self._last_process_pool.get("gpu")
|
||||
or (time() - self._last_process_pool["gpu"][0]) >= self._report_frequency
|
||||
)
|
||||
):
|
||||
gpu_mem = {}
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
@ -469,15 +489,15 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
|
||||
gpu_mem[i] = 0
|
||||
for p in g.processes:
|
||||
if p is not None and p['pid'] in self._last_process_id_list:
|
||||
gpu_mem[i] += p.get('gpu_memory_usage', 0)
|
||||
if p is not None and p["pid"] in self._last_process_id_list:
|
||||
gpu_mem[i] += p.get("gpu_memory_usage", 0)
|
||||
|
||||
self._last_process_pool['gpu'] = time(), gpu_mem
|
||||
self._last_process_pool["gpu"] = time(), gpu_mem
|
||||
else:
|
||||
# if we do no need to update the memory usage, run global query
|
||||
# if we have no parent process (backward compatibility), return global stats
|
||||
gpu_stat = self._gpustat.new_query(per_process_stats=False)
|
||||
gpu_mem = self._last_process_pool['gpu'][1] if self._last_process_pool.get('gpu') else None
|
||||
gpu_mem = self._last_process_pool["gpu"][1] if self._last_process_pool.get("gpu") else None
|
||||
|
||||
# generate the statistics dict for actual report
|
||||
stats = {}
|
||||
@ -486,7 +506,8 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# only monitor the active gpu's, if none were selected, monitor everything
|
||||
if self._skip_nonactive_gpu(g):
|
||||
continue
|
||||
stats["gpu_%d_temperature" % i] = float(g["temperature.gpu"])
|
||||
if g["temperature.gpu"] is not None:
|
||||
stats["gpu_%d_temperature" % i] = float(g["temperature.gpu"])
|
||||
if g["utilization.gpu"] is not None:
|
||||
stats["gpu_%d_utilization" % i] = float(g["utilization.gpu"])
|
||||
else:
|
||||
@ -505,7 +526,7 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
)
|
||||
)
|
||||
self._gpu_utilization_warning_sent = True
|
||||
stats["gpu_%d_mem_usage" % i] = 100. * float(g["memory.used"]) / float(g["memory.total"])
|
||||
stats["gpu_%d_mem_usage" % i] = 100.0 * float(g["memory.used"]) / float(g["memory.total"])
|
||||
# already in MBs
|
||||
stats["gpu_%d_mem_free_gb" % i] = float(g["memory.total"] - g["memory.used"]) / 1024
|
||||
# use previously sampled process gpu memory, or global if it does not exist
|
||||
@ -519,15 +540,15 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
specs = {
|
||||
'platform': str(sys.platform),
|
||||
'python_version': str(platform.python_version()),
|
||||
'python_exec': str(sys.executable),
|
||||
'OS': str(platform.platform(aliased=True)),
|
||||
'processor': str(platform.machine()),
|
||||
'cpu_cores': int(psutil.cpu_count()),
|
||||
'memory_gb': round(psutil.virtual_memory().total / 1024 ** 3, 1),
|
||||
'hostname': str(platform.node()),
|
||||
'gpu_count': 0,
|
||||
"platform": str(sys.platform),
|
||||
"python_version": str(platform.python_version()),
|
||||
"python_exec": str(sys.executable),
|
||||
"OS": str(platform.platform(aliased=True)),
|
||||
"processor": str(platform.machine()),
|
||||
"cpu_cores": int(psutil.cpu_count()),
|
||||
"memory_gb": round(psutil.virtual_memory().total / 1024**3, 1),
|
||||
"hostname": str(platform.node()),
|
||||
"gpu_count": 0,
|
||||
}
|
||||
if self._gpustat:
|
||||
gpu_stat = self._gpustat.new_query(shutdown=True, get_driver_info=True)
|
||||
@ -535,10 +556,10 @@ class ResourceMonitor(BackgroundMonitor):
|
||||
gpus = [g for i, g in enumerate(gpu_stat.gpus) if not self._skip_nonactive_gpu(g)]
|
||||
specs.update(
|
||||
gpu_count=int(len(gpus)),
|
||||
gpu_type=', '.join(g.name for g in gpus),
|
||||
gpu_memory=', '.join('{}GB'.format(round(g.memory_total / 1024.0)) for g in gpus),
|
||||
gpu_driver_version=gpu_stat.driver_version or '',
|
||||
gpu_driver_cuda_version=gpu_stat.driver_cuda_version or '',
|
||||
gpu_type=", ".join(g.name for g in gpus),
|
||||
gpu_memory=", ".join("{}GB".format(round(g.memory_total / 1024.0)) for g in gpus),
|
||||
gpu_driver_version=gpu_stat.driver_version or "",
|
||||
gpu_driver_cuda_version=gpu_stat.driver_cuda_version or "",
|
||||
)
|
||||
|
||||
except Exception:
|
||||
|
Before Width: | Height: | Size: 238 KiB After Width: | Height: | Size: 273 KiB |
Before Width: | Height: | Size: 2.5 MiB After Width: | Height: | Size: 2.4 MiB |
Before Width: | Height: | Size: 483 KiB After Width: | Height: | Size: 424 KiB |
Before Width: | Height: | Size: 379 KiB After Width: | Height: | Size: 344 KiB |