clearml/trains/backend_interface/metrics/interface.py
2020-08-08 12:47:08 +03:00

318 lines
13 KiB
Python

import json
import os
from functools import partial
from logging import warning
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
from time import time
from humanfriendly import format_timespan
from pathlib2 import Path
from ...backend_api.services import events as api_events
from ..base import InterfaceBase
from ...config import config
from ...debugging import get_logger
from ...storage.helper import StorageHelper
from .events import MetricsEventAdapter
log = get_logger('metrics')
class Metrics(InterfaceBase):
""" Metrics manager and batch writer """
_storage_lock = Lock()
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
_file_upload_retries = 3
_upload_pool = None
_file_upload_pool = None
__offline_filename = 'metrics.jsonl'
@property
def storage_key_prefix(self):
return self._storage_key_prefix
def _get_storage(self, storage_uri=None):
""" Storage helper used to upload files """
try:
# use a lock since this storage object will be requested by thread pool threads, so we need to make sure
# any singleton initialization will occur only once
self._storage_lock.acquire()
storage_uri = storage_uri or self._storage_uri
return StorageHelper.get(storage_uri)
except Exception as e:
log.error('Failed getting storage helper for %s: %s' % (storage_uri, str(e)))
finally:
self._storage_lock.release()
def __init__(self, session, task, storage_uri, storage_uri_suffix='metrics', iteration_offset=0, log=None):
super(Metrics, self).__init__(session, log=log)
self._task_id = task.id
self._task_iteration_offset = iteration_offset
self._storage_uri = storage_uri.rstrip('/') if storage_uri else None
self._storage_key_prefix = storage_uri_suffix.strip('/') if storage_uri_suffix else None
self._file_related_event_time = None
self._file_upload_time = None
self._offline_log_filename = None
if self._offline_mode:
offline_folder = Path(task.get_offline_mode_folder())
offline_folder.mkdir(parents=True, exist_ok=True)
self._offline_log_filename = offline_folder / self.__offline_filename
def write_events(self, events, async_enable=True, callback=None, **kwargs):
"""
Write events to the backend, uploading any required files.
:param events: A list of event objects
:param async_enable: If True, upload is performed asynchronously and an AsyncResult object is returned,
otherwise a blocking call is made and the upload result is returned.
:param callback: A optional callback called when upload was completed in case async is True
:return: .backend_api.session.CallResult if async is False otherwise AsyncResult. Note that if no events were
sent, None will be returned.
"""
if not events:
return
storage_uri = kwargs.pop('storage_uri', self._storage_uri)
if not async_enable:
return self._do_write_events(events, storage_uri)
def safe_call(*args, **kwargs):
try:
return self._do_write_events(*args, **kwargs)
except Exception as e:
return e
self._initialize_upload_pools()
return self._upload_pool.apply_async(
safe_call,
args=(events, storage_uri),
callback=partial(self._callback_wrapper, callback))
def set_iteration_offset(self, offset):
self._task_iteration_offset = offset
def get_iteration_offset(self):
return self._task_iteration_offset
def _callback_wrapper(self, callback, res):
""" A wrapper for the async callback for handling common errors """
if not res:
# no result yet
return
elif isinstance(res, Exception):
# error
self.log.error('Error trying to send metrics: %s' % str(res))
elif not res.ok():
# bad result, log error
self.log.error('Failed reporting metrics: %s' % str(res.meta))
# call callback, even if we received an error
if callback:
callback(res)
def _do_write_events(self, events, storage_uri=None):
""" Sends an iterable of events as a series of batch operations. note: metric send does not raise on error"""
assert isinstance(events, (list, tuple))
assert all(isinstance(x, MetricsEventAdapter) for x in events)
# def event_key(ev):
# return (ev.metric, ev.variant)
#
# events = sorted(events, key=event_key)
# multiple_events_for = [k for k, v in groupby(events, key=event_key) if len(list(v)) > 1]
# if multiple_events_for:
# log.warning(
# 'More than one metrics event sent for these metric/variant combinations in a report: %s' %
# ', '.join('%s/%s' % k for k in multiple_events_for))
storage_uri = storage_uri or self._storage_uri
def update_and_get_file_entry(ev):
entry = ev.get_file_entry()
kwargs = {}
if entry:
key, url = ev.get_target_full_upload_uri(storage_uri, self.storage_key_prefix, quote_uri=False)
kwargs[entry.key_prop] = key
kwargs[entry.url_prop] = url
if not entry.stream:
# if entry has no stream, we won't upload it
entry = None
else:
if not hasattr(entry.stream, 'read'):
raise ValueError('Invalid file object %s' % entry.stream)
entry.url = url
ev.update(task=self._task_id, iter_offset=self._task_iteration_offset, **kwargs)
return entry
# prepare event needing file upload
entries = []
for ev in events[:]:
try:
e = update_and_get_file_entry(ev)
if e:
entries.append(e)
except Exception as ex:
log.warning(str(ex))
events.remove(ev)
# upload the needed files
if entries:
# upload files
def upload(e):
upload_uri = e.upload_uri or storage_uri
try:
storage = self._get_storage(upload_uri)
url = storage.upload_from_stream(e.stream, e.url, retries=self._file_upload_retries)
e.event.update(url=url)
except Exception as exp:
log.warning("Failed uploading to {} ({})".format(
upload_uri if upload_uri else "(Could not calculate upload uri)",
exp,
))
e.set_exception(exp)
e.stream.close()
if e.delete_local_file:
# noinspection PyBroadException
try:
Path(e.delete_local_file).unlink()
except Exception:
pass
self._initialize_upload_pools()
res = self._file_upload_pool.map_async(upload, entries)
res.wait()
# remember the last time we uploaded a file
self._file_upload_time = time()
t_f, t_u, t_ref = \
(self._file_related_event_time, self._file_upload_time, self._file_upload_starvation_warning_sec)
if t_f and t_u and t_ref and (t_f - t_u) > t_ref:
log.warning('Possible metrics file upload starvation: files were not uploaded for %s' %
format_timespan(t_ref))
# send the events in a batched request
good_events = [ev for ev in events if ev.upload_exception is None]
error_events = [ev for ev in events if ev.upload_exception is not None]
if error_events:
log.error("Not uploading {}/{} events because the data upload failed".format(
len(error_events),
len(events),
))
if good_events:
_events = [ev.get_api_event() for ev in good_events]
batched_requests = [api_events.AddRequest(event=ev) for ev in _events if ev]
if batched_requests:
if self._offline_mode:
with open(self._offline_log_filename.as_posix(), 'at') as f:
f.write(json.dumps([b.to_dict() for b in batched_requests])+'\n')
return
req = api_events.AddBatchRequest(requests=batched_requests)
return self.send(req, raise_on_errors=False)
return None
@staticmethod
def _initialize_upload_pools():
if not Metrics._upload_pool:
Metrics._upload_pool = ThreadPool(processes=1)
if not Metrics._file_upload_pool:
Metrics._file_upload_pool = ThreadPool(
processes=config.get('network.metrics.file_upload_threads', 4))
@staticmethod
def close_async_threads():
file_pool = Metrics._file_upload_pool
Metrics._file_upload_pool = None
pool = Metrics._upload_pool
Metrics._upload_pool = None
if file_pool:
# noinspection PyBroadException
try:
file_pool.terminate()
file_pool.join()
except Exception:
pass
if pool:
# noinspection PyBroadException
try:
pool.terminate()
pool.join()
except Exception:
pass
@classmethod
def report_offline_session(cls, task, folder):
from ... import StorageManager
filename = Path(folder) / cls.__offline_filename
if not filename.is_file():
return False
# noinspection PyProtectedMember
remote_url = task._get_default_report_storage_uri()
if remote_url and remote_url.endswith('/'):
remote_url = remote_url[:-1]
uploaded_files = set()
task_id = task.id
with open(filename.as_posix(), 'rt') as f:
i = 0
while True:
try:
line = f.readline()
if not line:
break
list_requests = json.loads(line)
for r in list_requests:
org_task_id = r['task']
r['task'] = task_id
if r.get('key') and r.get('url'):
debug_sample = (Path(folder) / 'data').joinpath(*(r['key'].split('/')))
r['key'] = r['key'].replace(
'.{}{}'.format(org_task_id, os.sep), '.{}{}'.format(task_id, os.sep), 1)
r['url'] = '{}/{}'.format(remote_url, r['key'])
if debug_sample not in uploaded_files and debug_sample.is_file():
uploaded_files.add(debug_sample)
StorageManager.upload_file(local_file=debug_sample.as_posix(), remote_url=r['url'])
elif r.get('plot_str'):
# hack plotly embedded images links
# noinspection PyBroadException
try:
task_id_sep = '.{}{}'.format(org_task_id, os.sep)
plot = json.loads(r['plot_str'])
if plot.get('layout', {}).get('images'):
for image in plot['layout']['images']:
if task_id_sep not in image['source']:
continue
pre, post = image['source'].split(task_id_sep, 1)
pre = os.sep.join(pre.split(os.sep)[-2:])
debug_sample = (Path(folder) / 'data').joinpath(
pre+'.{}'.format(org_task_id), post)
image['source'] = '/'.join(
[remote_url, pre + '.{}'.format(task_id), post])
if debug_sample not in uploaded_files and debug_sample.is_file():
uploaded_files.add(debug_sample)
StorageManager.upload_file(
local_file=debug_sample.as_posix(), remote_url=image['source'])
r['plot_str'] = json.dumps(plot)
except Exception:
pass
i += 1
except StopIteration:
break
except Exception as ex:
warning('Failed reporting metric, line {} [{}]'.format(i, ex))
batch_requests = api_events.AddBatchRequest(requests=list_requests)
res = task.session.send(batch_requests)
if res and not res.ok():
warning("failed logging metric task to backend ({:d} lines, {})".format(
len(batch_requests.requests), str(res.meta)))
return True