diff --git a/trains/backend_interface/task/task.py b/trains/backend_interface/task/task.py index 8ae5375d..58060eab 100644 --- a/trains/backend_interface/task/task.py +++ b/trains/backend_interface/task/task.py @@ -191,7 +191,7 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): latest_version[0]), ) - check_package_update_thread = Thread(target=check_package_update) + check_package_update_thread = Thread(target=check_package_update, daemon=True) check_package_update_thread.start() result = ScriptInfo.get(log=self.log) for msg in result.warning_messages: diff --git a/trains/task.py b/trains/task.py index 02675c2c..f95a378a 100644 --- a/trains/task.py +++ b/trains/task.py @@ -33,6 +33,7 @@ from .utilities.args import argparser_parseargs_called, get_argparser_last_args, from .binding.frameworks.pytorch_bind import PatchPyTorchModelIO from .binding.frameworks.tensorflow_bind import PatchSummaryToEventTransformer, PatchTensorFlowEager, \ PatchKerasModelIO, PatchTensorflowModelIO +from .utilities.resource_monitor import ResourceMonitor from .binding.matplotlib_bind import PatchedMatplotlib from .utilities.seed import make_deterministic @@ -102,6 +103,7 @@ class Task(_Task): self._dev_mode_periodic_flag = False self._connected_parameter_type = None self._detect_repo_async_thread = None + self._resource_monitor = None # register atexit, so that we mark the task as stopped self._at_exit_called = False self.__register_at_exit(self._at_exit) @@ -124,6 +126,7 @@ class Task(_Task): output_uri=None, auto_connect_arg_parser=True, auto_connect_frameworks=True, + auto_resource_monitoring=True, ): """ Return the Task object for the main execution task (task context). @@ -141,6 +144,8 @@ class Task(_Task): if set to false, you can manually connect the ArgParser with task.connect(parser) :param auto_connect_frameworks: If true automatically patch MatplotLib, Keras callbacks, and TensorBoard/X to serialize plots, graphs and model location to trains backend (in addition to original output destination) + :param auto_resource_monitoring: If true, machine vitals will be sent along side the task scalars, + Resources graphs will appear under the title ':resource monitor:' in the scalars tab. :return: Task() object """ @@ -220,6 +225,9 @@ class Task(_Task): PatchKerasModelIO.update_current_task(task) PatchTensorflowModelIO.update_current_task(task) PatchPyTorchModelIO.update_current_task(task) + if auto_resource_monitoring: + task._resource_monitor = ResourceMonitor(task) + task._resource_monitor.start() # Check if parse args already called. If so, sync task parameters with parser if argparser_parseargs_called(): parser, parsed_args = get_argparser_last_args() @@ -409,7 +417,7 @@ class Task(_Task): # make sure everything is in sync task.reload() # make sure we see something in the UI - threading.Thread(target=LoggerRoot.flush).start() + threading.Thread(target=LoggerRoot.flush, daemon=True).start() return task @staticmethod @@ -944,6 +952,9 @@ class Task(_Task): self.log.info('Finished uploading') else: self._logger._flush_stdout_handler() + # stop resource monitoring + if self._resource_monitor: + self._resource_monitor.stop() self._logger.set_flush_period(None) # this is so in theory we can close a main task and start a new one Task.__main_task = None diff --git a/trains/utilities/resource_monitor.py b/trains/utilities/resource_monitor.py new file mode 100644 index 00000000..e3f038bf --- /dev/null +++ b/trains/utilities/resource_monitor.py @@ -0,0 +1,138 @@ +from time import time +from threading import Thread, Event + +import psutil +from pathlib2 import Path +from typing import Text + +try: + import gpustat +except ImportError: + gpustat = None + + +class ResourceMonitor(object): + _title_machine = ':monitor:machine' + _title_gpu = ':monitor:gpu' + + def __init__(self, task, measure_frequency_times_per_sec=2., report_frequency_sec=30.): + self._task = task + self._measure_frequency = measure_frequency_times_per_sec + self._report_frequency = report_frequency_sec + self._num_readouts = 0 + self._readouts = {} + self._previous_readouts = {} + self._previous_readouts_ts = time() + self._thread = None + self._exit_event = Event() + if not gpustat: + self._task.get_logger().console('TRAINS Monitor: GPU monitoring is not available, ' + 'run \"pip install gpustat\"') + + def start(self): + self._exit_event.clear() + self._thread = Thread(target=self._daemon, daemon=True) + self._thread.start() + + def stop(self): + self._exit_event.set() + # self._thread.join() + + def _daemon(self): + logger = self._task.get_logger() + seconds_since_started = 0 + while True: + last_report = time() + while (time() - last_report) < self._report_frequency: + # wait for self._measure_frequency seconds, if event set quit + if self._exit_event.wait(1.0 / self._measure_frequency): + return + # noinspection PyBroadException + try: + self._update_readouts() + except Exception: + pass + + average_readouts = self._get_average_readouts() + seconds_since_started += int(round(time() - last_report)) + for k, v in average_readouts.items(): + # noinspection PyBroadException + try: + title = self._title_gpu if k.startswith('gpu_') else self._title_machine + # 3 points after the dot + value = round(v*1000) / 1000. + logger.report_scalar(title=title, series=k, iteration=seconds_since_started, value=value) + except Exception: + pass + self._clear_readouts() + + def _update_readouts(self): + readouts = self._machine_stats() + elapsed = time() - self._previous_readouts_ts + self._previous_readouts_ts = time() + for k, v in readouts.items(): + # cumulative measurements + if k.endswith('_mbs'): + v = (v - self._previous_readouts.get(k, v)) / elapsed + + self._readouts[k] = self._readouts.get(k, 0.0) + v + self._num_readouts += 1 + self._previous_readouts = readouts + + def _get_num_readouts(self): + return self._num_readouts + + def _get_average_readouts(self): + average_readouts = dict((k, v/float(self._num_readouts)) for k, v in self._readouts.items()) + return average_readouts + + def _clear_readouts(self): + self._readouts = {} + self._num_readouts = 0 + + @staticmethod + def _machine_stats(): + """ + :return: machine stats dictionary, all values expressed in megabytes + """ + cpu_usage = [float(v) for v in psutil.cpu_percent(percpu=True)] + stats = { + "cpu_usage": sum(cpu_usage) / float(len(cpu_usage)), + } + + bytes_per_megabyte = 1024 ** 2 + + def bytes_to_megabytes(x): + return x / bytes_per_megabyte + + virtual_memory = psutil.virtual_memory() + stats["memory_used_gb"] = bytes_to_megabytes(virtual_memory.used) / 1024 + stats["memory_free_gb"] = bytes_to_megabytes(virtual_memory.available) / 1024 + disk_use_percentage = psutil.disk_usage(Text(Path.home())).percent + stats["disk_free_percent"] = 100.0-disk_use_percentage + sensor_stat = ( + psutil.sensors_temperatures() if hasattr(psutil, "sensors_temperatures") else {} + ) + if "coretemp" in sensor_stat and len(sensor_stat["coretemp"]): + stats["cpu_temperature"] = max([float(t.current) for t in sensor_stat["coretemp"]]) + + # update cached measurements + net_stats = psutil.net_io_counters() + stats["network_tx_mbs"] = bytes_to_megabytes(net_stats.bytes_sent) + stats["network_rx_mbs"] = bytes_to_megabytes(net_stats.bytes_recv) + io_stats = psutil.disk_io_counters() + stats["io_read_mbs"] = bytes_to_megabytes(io_stats.read_bytes) + stats["io_write_mbs"] = bytes_to_megabytes(io_stats.write_bytes) + + # check if we can access the gpu statistics + if gpustat: + gpu_stat = gpustat.new_query() + for i, g in enumerate(gpu_stat.gpus): + stats["gpu_%d_temperature" % i] = float(g["temperature.gpu"]) + stats["gpu_%d_utilization" % i] = float(g["utilization.gpu"]) + stats["gpu_%d_mem_usage" % i] = 100. * float(g["memory.used"]) / float(g["memory.total"]) + # already in MBs + stats["gpu_%d_mem_free_gb" % i] = float(g["memory.total"] - g["memory.used"]) / 1024 + stats["gpu_%d_mem_used_gb" % i] = float(g["memory.used"]) / 1024 + + return stats