Changed, resource monitor prefer sync to iterations, over seconds from beginning of experiment

This commit is contained in:
allegroai 2019-06-27 23:22:28 +03:00
parent 85e783cc6b
commit d27dc352cb
4 changed files with 91 additions and 7 deletions

View File

@ -2,6 +2,7 @@ import base64
import sys import sys
import threading import threading
from collections import defaultdict from collections import defaultdict
from functools import partial
from logging import ERROR, WARNING, getLogger from logging import ERROR, WARNING, getLogger
from typing import Any from typing import Any
@ -21,6 +22,23 @@ except ImportError:
MessageToDict = None MessageToDict = None
class IsTensorboardInit(object):
_tensorboard_initialized = False
@classmethod
def tensorboard_used(cls):
return cls._tensorboard_initialized
@classmethod
def set_tensorboard_used(cls):
cls._tensorboard_initialized = True
@staticmethod
def _patched_tb__init__(original_init, self, *args, **kwargs):
IsTensorboardInit._tensorboard_initialized = True
return original_init(self, *args, **kwargs)
class EventTrainsWriter(object): class EventTrainsWriter(object):
""" """
TF SummaryWriter implementation that converts the tensorboard's summary into TF SummaryWriter implementation that converts the tensorboard's summary into
@ -68,6 +86,7 @@ class EventTrainsWriter(object):
:param max_keep_images: Maximum number of images to save before starting to reuse files (per title/metric pair) :param max_keep_images: Maximum number of images to save before starting to reuse files (per title/metric pair)
""" """
# We are the events_writer, so that's what we'll pass # We are the events_writer, so that's what we'll pass
IsTensorboardInit.set_tensorboard_used()
self.max_keep_images = max_keep_images self.max_keep_images = max_keep_images
self.report_freq = report_freq self.report_freq = report_freq
self.image_report_freq = image_report_freq if image_report_freq else report_freq self.image_report_freq = image_report_freq if image_report_freq else report_freq
@ -407,6 +426,7 @@ class EventTrainsWriter(object):
class ProxyEventsWriter(object): class ProxyEventsWriter(object):
def __init__(self, events): def __init__(self, events):
IsTensorboardInit.set_tensorboard_used()
self._events = events self._events = events
def _get_sentinel_event(self): def _get_sentinel_event(self):
@ -768,6 +788,10 @@ class PatchTensorFlowEager(object):
gen_summary_ops.write_image_summary = PatchTensorFlowEager._write_image_summary gen_summary_ops.write_image_summary = PatchTensorFlowEager._write_image_summary
PatchTensorFlowEager.__original_fn_hist = gen_summary_ops.write_histogram_summary PatchTensorFlowEager.__original_fn_hist = gen_summary_ops.write_histogram_summary
gen_summary_ops.write_histogram_summary = PatchTensorFlowEager._write_hist_summary gen_summary_ops.write_histogram_summary = PatchTensorFlowEager._write_hist_summary
gen_summary_ops.create_summary_file_writer = partial(IsTensorboardInit._patched_tb__init__,
gen_summary_ops.create_summary_file_writer)
gen_summary_ops.create_summary_db_writer = partial(IsTensorboardInit._patched_tb__init__,
gen_summary_ops.create_summary_db_writer)
except ImportError: except ImportError:
pass pass
except Exception as ex: except Exception as ex:

View File

@ -810,7 +810,7 @@ class OutputModel(BaseModel):
framework=self.framework or framework, framework=self.framework or framework,
comment=comment, comment=comment,
cb=delete_previous_weights_file if auto_delete_file else None, cb=delete_previous_weights_file if auto_delete_file else None,
iteration=iteration or self._task.data.last_iteration, iteration=iteration or self._task.get_last_iteration(),
) )
elif register_uri: elif register_uri:
register_uri = StorageHelper.conform_url(register_uri) register_uri = StorageHelper.conform_url(register_uri)

View File

@ -664,6 +664,27 @@ class Task(_Task):
""" """
super(Task, self).set_model_label_enumeration(enumeration=enumeration) super(Task, self).set_model_label_enumeration(enumeration=enumeration)
def get_last_iteration(self):
"""
Return the last reported iteration (i.e. the maximum iteration the task reported a metric for)
Notice, this is not a cached call, it will ask the backend for the answer (no local caching)
:return integer, last reported iteration number
"""
self.reload()
return self.data.last_iteration
def set_last_iteration(self, last_iteration):
"""
Forcefully set the last reported iteration
(i.e. the maximum iteration the task reported a metric for)
:param last_iteration: last reported iteration number
:type last_iteration: integer
"""
self.data.last_iteration = int(last_iteration)
self._edit(last_iteration=self.data.last_iteration)
def _connect_output_model(self, model): def _connect_output_model(self, model):
assert isinstance(model, OutputModel) assert isinstance(model, OutputModel)
model.connect(self) model.connect(self)

View File

@ -4,6 +4,7 @@ from threading import Thread, Event
import psutil import psutil
from pathlib2 import Path from pathlib2 import Path
from typing import Text from typing import Text
from ..binding.frameworks.tensorflow_bind import IsTensorboardInit
try: try:
import gpustat import gpustat
@ -15,10 +16,13 @@ class ResourceMonitor(object):
_title_machine = ':monitor:machine' _title_machine = ':monitor:machine'
_title_gpu = ':monitor:gpu' _title_gpu = ':monitor:gpu'
def __init__(self, task, measure_frequency_times_per_sec=2., report_frequency_sec=30.): def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30.,
first_report_sec=None, wait_for_first_iteration_to_start_sec=180.):
self._task = task self._task = task
self._measure_frequency = measure_frequency_times_per_sec self._sample_frequency = sample_frequency_per_sec
self._report_frequency = report_frequency_sec self._report_frequency = report_frequency_sec
self._first_report_sec = first_report_sec or report_frequency_sec
self._wait_for_first_iteration = wait_for_first_iteration_to_start_sec
self._num_readouts = 0 self._num_readouts = 0
self._readouts = {} self._readouts = {}
self._previous_readouts = {} self._previous_readouts = {}
@ -41,11 +45,18 @@ class ResourceMonitor(object):
def _daemon(self): def _daemon(self):
logger = self._task.get_logger() logger = self._task.get_logger()
seconds_since_started = 0 seconds_since_started = 0
reported = 0
last_iteration = 0
last_iteration_ts = 0
last_iteration_interval = None
repeated_iterations = 0
fallback_to_sec_as_iterations = 0
while True: while True:
last_report = time() last_report = time()
while (time() - last_report) < self._report_frequency: current_report_frequency = self._report_frequency if reported != 0 else self._first_report_sec
# wait for self._measure_frequency seconds, if event set quit while (time() - last_report) < current_report_frequency:
if self._exit_event.wait(1.0 / self._measure_frequency): # wait for self._sample_frequency seconds, if event set quit
if self._exit_event.wait(1.0 / self._sample_frequency):
return return
# noinspection PyBroadException # noinspection PyBroadException
try: try:
@ -53,15 +64,43 @@ class ResourceMonitor(object):
except Exception: except Exception:
pass pass
reported += 1
average_readouts = self._get_average_readouts() average_readouts = self._get_average_readouts()
seconds_since_started += int(round(time() - last_report)) seconds_since_started += int(round(time() - last_report))
# check if we do not report any metric (so it means the last iteration will not be changed)
if fallback_to_sec_as_iterations is None:
if IsTensorboardInit.tensorboard_used():
fallback_to_sec_as_iterations = False
elif seconds_since_started >= self._wait_for_first_iteration:
fallback_to_sec_as_iterations = True
# if we do not have last_iteration, we just use seconds as iteration
if fallback_to_sec_as_iterations:
iteration = seconds_since_started
else:
iteration = self._task.get_last_iteration()
if iteration == last_iteration:
repeated_iterations += 1
if last_iteration_interval:
# to be on the safe side, we don't want to pass the actual next iteration
iteration += int(0.95*last_iteration_interval[0] * (seconds_since_started - last_iteration_ts)
/ last_iteration_interval[1])
else:
iteration += 1
else:
last_iteration_interval = (iteration - last_iteration, seconds_since_started - last_iteration_ts)
last_iteration_ts = seconds_since_started
last_iteration = iteration
repeated_iterations = 0
fallback_to_sec_as_iterations = False
for k, v in average_readouts.items(): for k, v in average_readouts.items():
# noinspection PyBroadException # noinspection PyBroadException
try: try:
title = self._title_gpu if k.startswith('gpu_') else self._title_machine title = self._title_gpu if k.startswith('gpu_') else self._title_machine
# 3 points after the dot # 3 points after the dot
value = round(v*1000) / 1000. value = round(v*1000) / 1000.
logger.report_scalar(title=title, series=k, iteration=seconds_since_started, value=value) logger.report_scalar(title=title, series=k, iteration=iteration, value=value)
except Exception: except Exception:
pass pass
self._clear_readouts() self._clear_readouts()