diff --git a/trains/binding/frameworks/tensorflow_bind.py b/trains/binding/frameworks/tensorflow_bind.py index 19d9616a..cac5756b 100644 --- a/trains/binding/frameworks/tensorflow_bind.py +++ b/trains/binding/frameworks/tensorflow_bind.py @@ -2,6 +2,7 @@ import base64 import sys import threading from collections import defaultdict +from functools import partial from logging import ERROR, WARNING, getLogger from typing import Any @@ -21,6 +22,23 @@ except ImportError: MessageToDict = None +class IsTensorboardInit(object): + _tensorboard_initialized = False + + @classmethod + def tensorboard_used(cls): + return cls._tensorboard_initialized + + @classmethod + def set_tensorboard_used(cls): + cls._tensorboard_initialized = True + + @staticmethod + def _patched_tb__init__(original_init, self, *args, **kwargs): + IsTensorboardInit._tensorboard_initialized = True + return original_init(self, *args, **kwargs) + + class EventTrainsWriter(object): """ TF SummaryWriter implementation that converts the tensorboard's summary into @@ -68,6 +86,7 @@ class EventTrainsWriter(object): :param max_keep_images: Maximum number of images to save before starting to reuse files (per title/metric pair) """ # We are the events_writer, so that's what we'll pass + IsTensorboardInit.set_tensorboard_used() self.max_keep_images = max_keep_images self.report_freq = report_freq self.image_report_freq = image_report_freq if image_report_freq else report_freq @@ -407,6 +426,7 @@ class EventTrainsWriter(object): class ProxyEventsWriter(object): def __init__(self, events): + IsTensorboardInit.set_tensorboard_used() self._events = events def _get_sentinel_event(self): @@ -768,6 +788,10 @@ class PatchTensorFlowEager(object): gen_summary_ops.write_image_summary = PatchTensorFlowEager._write_image_summary PatchTensorFlowEager.__original_fn_hist = gen_summary_ops.write_histogram_summary gen_summary_ops.write_histogram_summary = PatchTensorFlowEager._write_hist_summary + gen_summary_ops.create_summary_file_writer = partial(IsTensorboardInit._patched_tb__init__, + gen_summary_ops.create_summary_file_writer) + gen_summary_ops.create_summary_db_writer = partial(IsTensorboardInit._patched_tb__init__, + gen_summary_ops.create_summary_db_writer) except ImportError: pass except Exception as ex: diff --git a/trains/model.py b/trains/model.py index 4b32b9ae..d599db9e 100644 --- a/trains/model.py +++ b/trains/model.py @@ -810,7 +810,7 @@ class OutputModel(BaseModel): framework=self.framework or framework, comment=comment, cb=delete_previous_weights_file if auto_delete_file else None, - iteration=iteration or self._task.data.last_iteration, + iteration=iteration or self._task.get_last_iteration(), ) elif register_uri: register_uri = StorageHelper.conform_url(register_uri) diff --git a/trains/task.py b/trains/task.py index f95a378a..00ee3f1a 100644 --- a/trains/task.py +++ b/trains/task.py @@ -664,6 +664,27 @@ class Task(_Task): """ super(Task, self).set_model_label_enumeration(enumeration=enumeration) + def get_last_iteration(self): + """ + Return the last reported iteration (i.e. the maximum iteration the task reported a metric for) + Notice, this is not a cached call, it will ask the backend for the answer (no local caching) + + :return integer, last reported iteration number + """ + self.reload() + return self.data.last_iteration + + def set_last_iteration(self, last_iteration): + """ + Forcefully set the last reported iteration + (i.e. the maximum iteration the task reported a metric for) + + :param last_iteration: last reported iteration number + :type last_iteration: integer + """ + self.data.last_iteration = int(last_iteration) + self._edit(last_iteration=self.data.last_iteration) + def _connect_output_model(self, model): assert isinstance(model, OutputModel) model.connect(self) diff --git a/trains/utilities/resource_monitor.py b/trains/utilities/resource_monitor.py index e3f038bf..75f016bc 100644 --- a/trains/utilities/resource_monitor.py +++ b/trains/utilities/resource_monitor.py @@ -4,6 +4,7 @@ from threading import Thread, Event import psutil from pathlib2 import Path from typing import Text +from ..binding.frameworks.tensorflow_bind import IsTensorboardInit try: import gpustat @@ -15,10 +16,13 @@ class ResourceMonitor(object): _title_machine = ':monitor:machine' _title_gpu = ':monitor:gpu' - def __init__(self, task, measure_frequency_times_per_sec=2., report_frequency_sec=30.): + def __init__(self, task, sample_frequency_per_sec=2., report_frequency_sec=30., + first_report_sec=None, wait_for_first_iteration_to_start_sec=180.): self._task = task - self._measure_frequency = measure_frequency_times_per_sec + self._sample_frequency = sample_frequency_per_sec self._report_frequency = report_frequency_sec + self._first_report_sec = first_report_sec or report_frequency_sec + self._wait_for_first_iteration = wait_for_first_iteration_to_start_sec self._num_readouts = 0 self._readouts = {} self._previous_readouts = {} @@ -41,11 +45,18 @@ class ResourceMonitor(object): def _daemon(self): logger = self._task.get_logger() seconds_since_started = 0 + reported = 0 + last_iteration = 0 + last_iteration_ts = 0 + last_iteration_interval = None + repeated_iterations = 0 + fallback_to_sec_as_iterations = 0 while True: last_report = time() - while (time() - last_report) < self._report_frequency: - # wait for self._measure_frequency seconds, if event set quit - if self._exit_event.wait(1.0 / self._measure_frequency): + current_report_frequency = self._report_frequency if reported != 0 else self._first_report_sec + while (time() - last_report) < current_report_frequency: + # wait for self._sample_frequency seconds, if event set quit + if self._exit_event.wait(1.0 / self._sample_frequency): return # noinspection PyBroadException try: @@ -53,15 +64,43 @@ class ResourceMonitor(object): except Exception: pass + reported += 1 average_readouts = self._get_average_readouts() seconds_since_started += int(round(time() - last_report)) + # check if we do not report any metric (so it means the last iteration will not be changed) + if fallback_to_sec_as_iterations is None: + if IsTensorboardInit.tensorboard_used(): + fallback_to_sec_as_iterations = False + elif seconds_since_started >= self._wait_for_first_iteration: + fallback_to_sec_as_iterations = True + + # if we do not have last_iteration, we just use seconds as iteration + if fallback_to_sec_as_iterations: + iteration = seconds_since_started + else: + iteration = self._task.get_last_iteration() + if iteration == last_iteration: + repeated_iterations += 1 + if last_iteration_interval: + # to be on the safe side, we don't want to pass the actual next iteration + iteration += int(0.95*last_iteration_interval[0] * (seconds_since_started - last_iteration_ts) + / last_iteration_interval[1]) + else: + iteration += 1 + else: + last_iteration_interval = (iteration - last_iteration, seconds_since_started - last_iteration_ts) + last_iteration_ts = seconds_since_started + last_iteration = iteration + repeated_iterations = 0 + fallback_to_sec_as_iterations = False + for k, v in average_readouts.items(): # noinspection PyBroadException try: title = self._title_gpu if k.startswith('gpu_') else self._title_machine # 3 points after the dot value = round(v*1000) / 1000. - logger.report_scalar(title=title, series=k, iteration=seconds_since_started, value=value) + logger.report_scalar(title=title, series=k, iteration=iteration, value=value) except Exception: pass self._clear_readouts()