mirror of
https://github.com/clearml/clearml
synced 2025-02-01 01:26:49 +00:00
322 lines
14 KiB
Python
322 lines
14 KiB
Python
import json
|
|
import os
|
|
from functools import partial
|
|
from logging import warning
|
|
from multiprocessing.pool import ThreadPool
|
|
from multiprocessing import Lock
|
|
from time import time
|
|
|
|
from humanfriendly import format_timespan
|
|
from pathlib2 import Path
|
|
|
|
from ...backend_api.services import events as api_events
|
|
from ..base import InterfaceBase
|
|
from ...config import config
|
|
from ...debugging import get_logger
|
|
from ...storage.helper import StorageHelper
|
|
|
|
from .events import MetricsEventAdapter
|
|
|
|
|
|
log = get_logger('metrics')
|
|
|
|
|
|
class Metrics(InterfaceBase):
|
|
""" Metrics manager and batch writer """
|
|
_storage_lock = Lock()
|
|
_file_upload_starvation_warning_sec = config.get('network.metrics.file_upload_starvation_warning_sec', None)
|
|
_file_upload_retries = 3
|
|
_upload_pool = None
|
|
_file_upload_pool = None
|
|
__offline_filename = 'metrics.jsonl'
|
|
|
|
@property
|
|
def storage_key_prefix(self):
|
|
return self._storage_key_prefix
|
|
|
|
def _get_storage(self, storage_uri=None):
|
|
""" Storage helper used to upload files """
|
|
try:
|
|
# use a lock since this storage object will be requested by thread pool threads, so we need to make sure
|
|
# any singleton initialization will occur only once
|
|
self._storage_lock.acquire()
|
|
storage_uri = storage_uri or self._storage_uri
|
|
return StorageHelper.get(storage_uri)
|
|
except Exception as e:
|
|
log.error('Failed getting storage helper for %s: %s' % (storage_uri, str(e)))
|
|
finally:
|
|
self._storage_lock.release()
|
|
|
|
def __init__(self, session, task, storage_uri, storage_uri_suffix='metrics', iteration_offset=0, log=None):
|
|
super(Metrics, self).__init__(session, log=log)
|
|
self._task_id = task.id
|
|
self._task_iteration_offset = iteration_offset
|
|
self._storage_uri = storage_uri.rstrip('/') if storage_uri else None
|
|
self._storage_key_prefix = storage_uri_suffix.strip('/') if storage_uri_suffix else None
|
|
self._file_related_event_time = None
|
|
self._file_upload_time = None
|
|
self._offline_log_filename = None
|
|
if self._offline_mode:
|
|
offline_folder = Path(task.get_offline_mode_folder())
|
|
offline_folder.mkdir(parents=True, exist_ok=True)
|
|
self._offline_log_filename = offline_folder / self.__offline_filename
|
|
|
|
def write_events(self, events, async_enable=True, callback=None, **kwargs):
|
|
"""
|
|
Write events to the backend, uploading any required files.
|
|
:param events: A list of event objects
|
|
:param async_enable: If True, upload is performed asynchronously and an AsyncResult object is returned,
|
|
otherwise a blocking call is made and the upload result is returned.
|
|
:param callback: A optional callback called when upload was completed in case async is True
|
|
:return: .backend_api.session.CallResult if async is False otherwise AsyncResult. Note that if no events were
|
|
sent, None will be returned.
|
|
"""
|
|
if not events:
|
|
return
|
|
|
|
storage_uri = kwargs.pop('storage_uri', self._storage_uri)
|
|
|
|
if not async_enable:
|
|
return self._do_write_events(events, storage_uri)
|
|
|
|
def safe_call(*args, **kwargs):
|
|
try:
|
|
return self._do_write_events(*args, **kwargs)
|
|
except Exception as e:
|
|
return e
|
|
|
|
self._initialize_upload_pools()
|
|
return self._upload_pool.apply_async(
|
|
safe_call,
|
|
args=(events, storage_uri),
|
|
callback=partial(self._callback_wrapper, callback))
|
|
|
|
def set_iteration_offset(self, offset):
|
|
self._task_iteration_offset = offset
|
|
|
|
def get_iteration_offset(self):
|
|
return self._task_iteration_offset
|
|
|
|
def _callback_wrapper(self, callback, res):
|
|
""" A wrapper for the async callback for handling common errors """
|
|
if not res:
|
|
# no result yet
|
|
return
|
|
elif isinstance(res, Exception):
|
|
# error
|
|
self.log.error('Error trying to send metrics: %s' % str(res))
|
|
elif not res.ok():
|
|
# bad result, log error
|
|
self.log.error('Failed reporting metrics: %s' % str(res.meta))
|
|
# call callback, even if we received an error
|
|
if callback:
|
|
callback(res)
|
|
|
|
def _do_write_events(self, events, storage_uri=None):
|
|
""" Sends an iterable of events as a series of batch operations. note: metric send does not raise on error"""
|
|
assert isinstance(events, (list, tuple))
|
|
assert all(isinstance(x, MetricsEventAdapter) for x in events)
|
|
|
|
# def event_key(ev):
|
|
# return (ev.metric, ev.variant)
|
|
#
|
|
# events = sorted(events, key=event_key)
|
|
# multiple_events_for = [k for k, v in groupby(events, key=event_key) if len(list(v)) > 1]
|
|
# if multiple_events_for:
|
|
# log.warning(
|
|
# 'More than one metrics event sent for these metric/variant combinations in a report: %s' %
|
|
# ', '.join('%s/%s' % k for k in multiple_events_for))
|
|
|
|
storage_uri = storage_uri or self._storage_uri
|
|
|
|
def update_and_get_file_entry(ev):
|
|
entry = ev.get_file_entry()
|
|
kwargs = {}
|
|
if entry:
|
|
key, url = ev.get_target_full_upload_uri(storage_uri, self.storage_key_prefix, quote_uri=False)
|
|
kwargs[entry.key_prop] = key
|
|
kwargs[entry.url_prop] = url
|
|
if not entry.stream:
|
|
# if entry has no stream, we won't upload it
|
|
entry = None
|
|
else:
|
|
if not isinstance(entry.stream, Path) and not hasattr(entry.stream, 'read'):
|
|
raise ValueError('Invalid file object %s' % entry.stream)
|
|
entry.url = url
|
|
ev.update(task=self._task_id, iter_offset=self._task_iteration_offset, **kwargs)
|
|
return entry
|
|
|
|
# prepare event needing file upload
|
|
entries = []
|
|
for ev in events[:]:
|
|
try:
|
|
e = update_and_get_file_entry(ev)
|
|
if e:
|
|
entries.append(e)
|
|
except Exception as ex:
|
|
log.warning(str(ex))
|
|
events.remove(ev)
|
|
|
|
# upload the needed files
|
|
if entries:
|
|
# upload files
|
|
def upload(e):
|
|
upload_uri = e.upload_uri or storage_uri
|
|
|
|
try:
|
|
storage = self._get_storage(upload_uri)
|
|
if isinstance(e.stream, Path):
|
|
url = storage.upload(e.stream.as_posix(), e.url, retries=self._file_upload_retries)
|
|
else:
|
|
url = storage.upload_from_stream(e.stream, e.url, retries=self._file_upload_retries)
|
|
e.event.update(url=url)
|
|
except Exception as exp:
|
|
log.warning("Failed uploading to {} ({})".format(
|
|
upload_uri if upload_uri else "(Could not calculate upload uri)",
|
|
exp,
|
|
))
|
|
|
|
e.set_exception(exp)
|
|
if not isinstance(e.stream, Path):
|
|
e.stream.close()
|
|
if e.delete_local_file:
|
|
# noinspection PyBroadException
|
|
try:
|
|
Path(e.delete_local_file).unlink()
|
|
except Exception:
|
|
pass
|
|
|
|
self._initialize_upload_pools()
|
|
res = self._file_upload_pool.map_async(upload, entries)
|
|
res.wait()
|
|
|
|
# remember the last time we uploaded a file
|
|
self._file_upload_time = time()
|
|
|
|
t_f, t_u, t_ref = \
|
|
(self._file_related_event_time, self._file_upload_time, self._file_upload_starvation_warning_sec)
|
|
if t_f and t_u and t_ref and (t_f - t_u) > t_ref:
|
|
log.warning('Possible metrics file upload starvation: files were not uploaded for %s' %
|
|
format_timespan(t_ref))
|
|
|
|
# send the events in a batched request
|
|
good_events = [ev for ev in events if ev.upload_exception is None]
|
|
error_events = [ev for ev in events if ev.upload_exception is not None]
|
|
|
|
if error_events:
|
|
log.error("Not uploading {}/{} events because the data upload failed".format(
|
|
len(error_events),
|
|
len(events),
|
|
))
|
|
|
|
if good_events:
|
|
_events = [ev.get_api_event() for ev in good_events]
|
|
batched_requests = [api_events.AddRequest(event=ev) for ev in _events if ev]
|
|
if batched_requests:
|
|
if self._offline_mode:
|
|
with open(self._offline_log_filename.as_posix(), 'at') as f:
|
|
f.write(json.dumps([b.to_dict() for b in batched_requests])+'\n')
|
|
return
|
|
|
|
req = api_events.AddBatchRequest(requests=batched_requests)
|
|
return self.send(req, raise_on_errors=False)
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def _initialize_upload_pools():
|
|
if not Metrics._upload_pool:
|
|
Metrics._upload_pool = ThreadPool(processes=1)
|
|
if not Metrics._file_upload_pool:
|
|
Metrics._file_upload_pool = ThreadPool(
|
|
processes=config.get('network.metrics.file_upload_threads', 4))
|
|
|
|
@staticmethod
|
|
def close_async_threads():
|
|
file_pool = Metrics._file_upload_pool
|
|
Metrics._file_upload_pool = None
|
|
pool = Metrics._upload_pool
|
|
Metrics._upload_pool = None
|
|
|
|
if file_pool:
|
|
# noinspection PyBroadException
|
|
try:
|
|
file_pool.terminate()
|
|
file_pool.join()
|
|
except Exception:
|
|
pass
|
|
|
|
if pool:
|
|
# noinspection PyBroadException
|
|
try:
|
|
pool.terminate()
|
|
pool.join()
|
|
except Exception:
|
|
pass
|
|
|
|
@classmethod
|
|
def report_offline_session(cls, task, folder):
|
|
from ... import StorageManager
|
|
filename = Path(folder) / cls.__offline_filename
|
|
if not filename.is_file():
|
|
return False
|
|
# noinspection PyProtectedMember
|
|
remote_url = task._get_default_report_storage_uri()
|
|
if remote_url and remote_url.endswith('/'):
|
|
remote_url = remote_url[:-1]
|
|
uploaded_files = set()
|
|
task_id = task.id
|
|
with open(filename.as_posix(), 'rt') as f:
|
|
i = 0
|
|
while True:
|
|
try:
|
|
line = f.readline()
|
|
if not line:
|
|
break
|
|
list_requests = json.loads(line)
|
|
for r in list_requests:
|
|
org_task_id = r['task']
|
|
r['task'] = task_id
|
|
if r.get('key') and r.get('url'):
|
|
debug_sample = (Path(folder) / 'data').joinpath(*(r['key'].split('/')))
|
|
r['key'] = r['key'].replace(
|
|
'.{}{}'.format(org_task_id, os.sep), '.{}{}'.format(task_id, os.sep), 1)
|
|
r['url'] = '{}/{}'.format(remote_url, r['key'])
|
|
if debug_sample not in uploaded_files and debug_sample.is_file():
|
|
uploaded_files.add(debug_sample)
|
|
StorageManager.upload_file(local_file=debug_sample.as_posix(), remote_url=r['url'])
|
|
elif r.get('plot_str'):
|
|
# hack plotly embedded images links
|
|
# noinspection PyBroadException
|
|
try:
|
|
task_id_sep = '.{}{}'.format(org_task_id, os.sep)
|
|
plot = json.loads(r['plot_str'])
|
|
if plot.get('layout', {}).get('images'):
|
|
for image in plot['layout']['images']:
|
|
if task_id_sep not in image['source']:
|
|
continue
|
|
pre, post = image['source'].split(task_id_sep, 1)
|
|
pre = os.sep.join(pre.split(os.sep)[-2:])
|
|
debug_sample = (Path(folder) / 'data').joinpath(
|
|
pre+'.{}'.format(org_task_id), post)
|
|
image['source'] = '/'.join(
|
|
[remote_url, pre + '.{}'.format(task_id), post])
|
|
if debug_sample not in uploaded_files and debug_sample.is_file():
|
|
uploaded_files.add(debug_sample)
|
|
StorageManager.upload_file(
|
|
local_file=debug_sample.as_posix(), remote_url=image['source'])
|
|
r['plot_str'] = json.dumps(plot)
|
|
except Exception:
|
|
pass
|
|
i += 1
|
|
except StopIteration:
|
|
break
|
|
except Exception as ex:
|
|
warning('Failed reporting metric, line {} [{}]'.format(i, ex))
|
|
batch_requests = api_events.AddBatchRequest(requests=list_requests)
|
|
res = task.session.send(batch_requests)
|
|
if res and not res.ok():
|
|
warning("failed logging metric task to backend ({:d} lines, {})".format(
|
|
len(batch_requests.requests), str(res.meta)))
|
|
return True
|